[jira] [Commented] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-17 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17477531#comment-17477531
 ] 

Bruce Wong commented on FLINK-25560:


Hi, [~MartijnVisser] 

Is there anyone else who can help me to review this issue.

Thanks.

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png, 
> image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, 
> image-2022-01-11-20-07-43-900.png, image-2022-01-11-20-09-29-074.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Description: 
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//TableConfig config = tEnv.getConfig();
//config.setIdleStateRetention(Duration.ofHours(2));

String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
"  id   STRING,\n" +
"  category_id   STRING,\n" +
"  upc   STRING,\n" +
"  `name`STRING,\n" +
"  price_cents   STRING,\n" +
"  original_price_cents  STRING,\n" +
"  short_descSTRING,\n" +
"  desc  STRING,\n" +
"  cover_url STRING,\n" +
"  created_atSTRING,\n" +
"  updated_atSTRING,\n" +
"  deleted_atSTRING,\n" +
"  extra STRING,\n" +
"  statusSTRING,\n" +
"  scholarship_cents STRING,\n" +
"  is_paybackSTRING,\n" +
"  is_support_iapSTRING,\n" +
"  iap_product_idSTRING,\n" +
"  neo_product_code  STRING,\n" +
"  paid_redirect_url STRING,\n" +
"  subscription_type STRING\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'dim-bundles',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'properties.group.id' = 'vvp_dev',\n" +
"  'scan.startup.mode' = 'latest-offset',\n" +
"  'value.debezium-json.schema-include' = 'true',\n" +
"  'value.format' = 'debezium-json',\n" +
"  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
"rowkey  STRING,\n" +
"cf ROW\n" 
+
") with (\n" +
"  'connector'='hbase-2.2',\n" +
"  'table-name'='dim_hbase',\n" +
"  'zookeeper.quorum'='localhost:2181',\n" +
"  'sink.buffer-flush.max-size' = '0',\n" +
"  'sink.buffer-flush.max-rows' = '1',\n" +
"  'sink.delete.mode' = 'all-versions'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
"  upc as rowkey,\n" +
"  ROW(\n" +
"id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}
h1. Test
 # After the test, we found that the deleted CF was the CF specified in the 
Flink SQL DML statement, without affecting other CF.

!image-2022-01-11-20-02-17-780.png|width=793,height=294!

!image-2022-01-11-20-04-48-299.png|width=793,height=338!

!image-2022-01-11-20-05-53-217.png|width=856,height=479!

Two CFs to one CF. 

!image-2022-01-11-20-07-43-900.png|width=859,height=422!

The data of cf2 is still there. So the deleted CF was the CF specified in the 

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Attachment: image-2022-01-11-20-09-29-074.png

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png, 
> image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, 
> image-2022-01-11-20-07-43-900.png, image-2022-01-11-20-09-29-074.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
>   

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Attachment: image-2022-01-11-20-07-43-900.png

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png, 
> image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, 
> image-2022-01-11-20-07-43-900.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
> "  

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Attachment: image-2022-01-11-20-05-53-217.png

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png, 
> image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
> "  'zookeeper.quorum'='localhost:2181',\n" +
> 

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Attachment: image-2022-01-11-20-04-48-299.png

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png, 
> image-2022-01-11-20-04-48-299.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
> "  'zookeeper.quorum'='localhost:2181',\n" +
> "  

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Description: 
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//TableConfig config = tEnv.getConfig();
//config.setIdleStateRetention(Duration.ofHours(2));

String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
"  id   STRING,\n" +
"  category_id   STRING,\n" +
"  upc   STRING,\n" +
"  `name`STRING,\n" +
"  price_cents   STRING,\n" +
"  original_price_cents  STRING,\n" +
"  short_descSTRING,\n" +
"  desc  STRING,\n" +
"  cover_url STRING,\n" +
"  created_atSTRING,\n" +
"  updated_atSTRING,\n" +
"  deleted_atSTRING,\n" +
"  extra STRING,\n" +
"  statusSTRING,\n" +
"  scholarship_cents STRING,\n" +
"  is_paybackSTRING,\n" +
"  is_support_iapSTRING,\n" +
"  iap_product_idSTRING,\n" +
"  neo_product_code  STRING,\n" +
"  paid_redirect_url STRING,\n" +
"  subscription_type STRING\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'dim-bundles',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'properties.group.id' = 'vvp_dev',\n" +
"  'scan.startup.mode' = 'latest-offset',\n" +
"  'value.debezium-json.schema-include' = 'true',\n" +
"  'value.format' = 'debezium-json',\n" +
"  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
"rowkey  STRING,\n" +
"cf ROW\n" 
+
") with (\n" +
"  'connector'='hbase-2.2',\n" +
"  'table-name'='dim_hbase',\n" +
"  'zookeeper.quorum'='localhost:2181',\n" +
"  'sink.buffer-flush.max-size' = '0',\n" +
"  'sink.buffer-flush.max-rows' = '1',\n" +
"  'sink.delete.mode' = 'all-versions'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
"  upc as rowkey,\n" +
"  ROW(\n" +
"id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}
h1. Test
 # After the test, we found that the deleted CF was the CF specified in the 
Flink SQL DML statement, without affecting other CF.

!image-2022-01-11-20-02-17-780.png|width=793,height=294!

 
h1. Reference

Please look at the following task link.

FLINK-25330
 

  was:
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a 

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Attachment: image-2022-01-11-20-02-17-780.png

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
> "  'zookeeper.quorum'='localhost:2181',\n" +
> "  'sink.buffer-flush.max-size' = '0',\n" +
> "  

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-11 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Description: 
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//TableConfig config = tEnv.getConfig();
//config.setIdleStateRetention(Duration.ofHours(2));

String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
"  id   STRING,\n" +
"  category_id   STRING,\n" +
"  upc   STRING,\n" +
"  `name`STRING,\n" +
"  price_cents   STRING,\n" +
"  original_price_cents  STRING,\n" +
"  short_descSTRING,\n" +
"  desc  STRING,\n" +
"  cover_url STRING,\n" +
"  created_atSTRING,\n" +
"  updated_atSTRING,\n" +
"  deleted_atSTRING,\n" +
"  extra STRING,\n" +
"  statusSTRING,\n" +
"  scholarship_cents STRING,\n" +
"  is_paybackSTRING,\n" +
"  is_support_iapSTRING,\n" +
"  iap_product_idSTRING,\n" +
"  neo_product_code  STRING,\n" +
"  paid_redirect_url STRING,\n" +
"  subscription_type STRING\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'dim-bundles',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'properties.group.id' = 'vvp_dev',\n" +
"  'scan.startup.mode' = 'latest-offset',\n" +
"  'value.debezium-json.schema-include' = 'true',\n" +
"  'value.format' = 'debezium-json',\n" +
"  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
"rowkey  STRING,\n" +
"cf ROW\n" 
+
") with (\n" +
"  'connector'='hbase-2.2',\n" +
"  'table-name'='dim_hbase',\n" +
"  'zookeeper.quorum'='localhost:2181',\n" +
"  'sink.buffer-flush.max-size' = '0',\n" +
"  'sink.buffer-flush.max-rows' = '1',\n" +
"  'sink.delete.mode' = 'all-versions'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
"  upc as rowkey,\n" +
"  ROW(\n" +
"id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}
h1. Test

 
h1. Reference

Please look at the following task link.

FLINK-25330
 

  was:
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import 

[jira] [Comment Edited] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2022-01-09 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17471650#comment-17471650
 ] 

Bruce Wong edited comment on FLINK-25330 at 1/10/22, 3:19 AM:
--

Hi [~jingge] ,

I created a new task to solve this problem. Please take a look at that. 
FLINK-25560

Thanks.


was (Author: bruce wong):
Hi [~jingge] ,

I created a new task to solve this problem. Please take a look at that. 
[FLINK-25560|https://issues.apache.org/jira/browse/FLINK-25560]

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2022-01-09 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17471650#comment-17471650
 ] 

Bruce Wong commented on FLINK-25330:


Hi [~jingge] ,

I created a new task to solve this problem. Please take a look at that. 
[FLINK-25560|https://issues.apache.org/jira/browse/FLINK-25560]

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-06 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Component/s: Connectors / HBase

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / HBase
>Affects Versions: 1.15.0
>Reporter: Bruce Wong
>Priority: Major
>  Labels: pull-request-available
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
> "  'zookeeper.quorum'='localhost:2181',\n" +
> "  'sink.buffer-flush.max-size' = '0',\n" +
> "  'sink.buffer-flush.max-rows' = '1',\n" +
> "  

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-06 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Affects Version/s: 1.15.0

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.15.0
>Reporter: Bruce Wong
>Priority: Major
>  Labels: pull-request-available
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //TableConfig config = tEnv.getConfig();
> //config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
> "  id   STRING,\n" +
> "  category_id   STRING,\n" +
> "  upc   STRING,\n" +
> "  `name`STRING,\n" +
> "  price_cents   STRING,\n" +
> "  original_price_cents  STRING,\n" +
> "  short_descSTRING,\n" +
> "  desc  STRING,\n" +
> "  cover_url STRING,\n" +
> "  created_atSTRING,\n" +
> "  updated_atSTRING,\n" +
> "  deleted_atSTRING,\n" +
> "  extra STRING,\n" +
> "  statusSTRING,\n" +
> "  scholarship_cents STRING,\n" +
> "  is_paybackSTRING,\n" +
> "  is_support_iapSTRING,\n" +
> "  iap_product_idSTRING,\n" +
> "  neo_product_code  STRING,\n" +
> "  paid_redirect_url STRING,\n" +
> "  subscription_type STRING\n" +
> ") WITH (\n" +
> "  'connector' = 'kafka',\n" +
> "  'topic' = 'dim-bundles',\n" +
> "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
> "  'properties.group.id' = 'vvp_dev',\n" +
> "  'scan.startup.mode' = 'latest-offset',\n" +
> "  'value.debezium-json.schema-include' = 'true',\n" +
> "  'value.format' = 'debezium-json',\n" +
> "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> "rowkey  STRING,\n" +
> "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
> ") with (\n" +
> "  'connector'='hbase-2.2',\n" +
> "  'table-name'='dim_hbase',\n" +
> "  'zookeeper.quorum'='localhost:2181',\n" +
> "  'sink.buffer-flush.max-size' = '0',\n" +
> "  'sink.buffer-flush.max-rows' = '1',\n" +
> "  'sink.delete.mode' = 'all-versions'\n" +
> ")";

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-06 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Description: 
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//TableConfig config = tEnv.getConfig();
//config.setIdleStateRetention(Duration.ofHours(2));

String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
"  id   STRING,\n" +
"  category_id   STRING,\n" +
"  upc   STRING,\n" +
"  `name`STRING,\n" +
"  price_cents   STRING,\n" +
"  original_price_cents  STRING,\n" +
"  short_descSTRING,\n" +
"  desc  STRING,\n" +
"  cover_url STRING,\n" +
"  created_atSTRING,\n" +
"  updated_atSTRING,\n" +
"  deleted_atSTRING,\n" +
"  extra STRING,\n" +
"  statusSTRING,\n" +
"  scholarship_cents STRING,\n" +
"  is_paybackSTRING,\n" +
"  is_support_iapSTRING,\n" +
"  iap_product_idSTRING,\n" +
"  neo_product_code  STRING,\n" +
"  paid_redirect_url STRING,\n" +
"  subscription_type STRING\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'dim-bundles',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'properties.group.id' = 'vvp_dev',\n" +
"  'scan.startup.mode' = 'latest-offset',\n" +
"  'value.debezium-json.schema-include' = 'true',\n" +
"  'value.format' = 'debezium-json',\n" +
"  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
"rowkey  STRING,\n" +
"cf ROW\n" 
+
") with (\n" +
"  'connector'='hbase-2.2',\n" +
"  'table-name'='dim_hbase',\n" +
"  'zookeeper.quorum'='localhost:2181',\n" +
"  'sink.buffer-flush.max-size' = '0',\n" +
"  'sink.buffer-flush.max-rows' = '1',\n" +
"  'sink.delete.mode' = 'all-versions'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
"  upc as rowkey,\n" +
"  ROW(\n" +
"id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}
h1. Reference

Please look at the following task link.

FLINK-25330
 

  was:
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-06 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Description: 
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//TableConfig config = tEnv.getConfig();
//config.setIdleStateRetention(Duration.ofHours(2));

String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
"  id   STRING,\n" +
"  category_id   STRING,\n" +
"  upc   STRING,\n" +
"  `name`STRING,\n" +
"  price_cents   STRING,\n" +
"  original_price_cents  STRING,\n" +
"  short_descSTRING,\n" +
"  desc  STRING,\n" +
"  cover_url STRING,\n" +
"  created_atSTRING,\n" +
"  updated_atSTRING,\n" +
"  deleted_atSTRING,\n" +
"  extra STRING,\n" +
"  statusSTRING,\n" +
"  scholarship_cents STRING,\n" +
"  is_paybackSTRING,\n" +
"  is_support_iapSTRING,\n" +
"  iap_product_idSTRING,\n" +
"  neo_product_code  STRING,\n" +
"  paid_redirect_url STRING,\n" +
"  subscription_type STRING\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'dim-bundles',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'properties.group.id' = 'vvp_dev',\n" +
"  'scan.startup.mode' = 'latest-offset',\n" +
"  'value.debezium-json.schema-include' = 'true',\n" +
"  'value.format' = 'debezium-json',\n" +
"  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
"rowkey  STRING,\n" +
"cf ROW\n" 
+
") with (\n" +
"  'connector'='hbase-2.2',\n" +
"  'table-name'='dim_hbase',\n" +
"  'zookeeper.quorum'='localhost:2181',\n" +
"  'sink.buffer-flush.max-size' = '0',\n" +
"  'sink.buffer-flush.max-rows' = '1',\n" +
"  'sink.delete.mode' = 'all-versions'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
"  upc as rowkey,\n" +
"  ROW(\n" +
"id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}
h1. Reference

Please look at the following task link.

https://issues.apache.org/jira/browse/FLINK-25330

[FLINK-25330|https://issues.apache.org/jira/browse/FLINK-25330]

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: 

[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode

2022-01-06 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
---
Summary: Add "sink.delete.mode" in HBase sql connector for retracting the 
latest version or all versions in changelog mode  (was: Add )

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
>  Issue Type: Improvement
>Reporter: Bruce Wong
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25560) Add

2022-01-06 Thread Bruce Wong (Jira)
Bruce Wong created FLINK-25560:
--

 Summary: Add 
 Key: FLINK-25560
 URL: https://issues.apache.org/jira/browse/FLINK-25560
 Project: Flink
  Issue Type: Improvement
Reporter: Bruce Wong






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-24 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17464946#comment-17464946
 ] 

Bruce Wong commented on FLINK-25330:


Hi, [~jingge] 

Thanks for your reply. I updated the test data (bundle_data1.zip), and the test 
result is test_res2.png. The test results are the same as before.
The test code and configuration are as follows.

 
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
TableConfig config = tEnv.getConfig();
//config.setIdleStateRetention(Duration.ofHours(2));

String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
"  id   STRING,\n" +
"  category_id   STRING,\n" +
"  upc   STRING,\n" +
"  `name`STRING,\n" +
"  price_cents   STRING,\n" +
"  original_price_cents  STRING,\n" +
"  short_descSTRING,\n" +
"  desc  STRING,\n" +
"  cover_url STRING,\n" +
"  created_atSTRING,\n" +
"  updated_atSTRING,\n" +
"  deleted_atSTRING,\n" +
"  extra STRING,\n" +
"  statusSTRING,\n" +
"  scholarship_cents STRING,\n" +
"  is_paybackSTRING,\n" +
"  is_support_iapSTRING,\n" +
"  iap_product_idSTRING,\n" +
"  neo_product_code  STRING,\n" +
"  paid_redirect_url STRING,\n" +
"  subscription_type STRING\n" +
") WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'topic' = 'dim-bundles',\n" +
"  'properties.bootstrap.servers' = 'localhost:9092',\n" +
"  'properties.group.id' = 'vvp_dev',\n" +
"  'scan.startup.mode' = 'latest-offset',\n" +
"  'value.debezium-json.schema-include' = 'true',\n" +
"  'value.format' = 'debezium-json',\n" +
"  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
"rowkey  STRING,\n" +
"cf ROW\n" 
+
") with (\n" +
"  'connector'='hbase-2.2',\n" +
"  'table-name'='dim_hbase',\n" +
"  'zookeeper.quorum'='localhost:2181',\n" +
"  'sink.buffer-flush.max-size' = '0',\n" +
"  'sink.buffer-flush.max-rows' = '1'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
"  upc as rowkey,\n" +
"  ROW(\n" +
"id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, 
> image-2021-12-15-20-05-18-236.png, 

[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-24 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: test_res2.png

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-24 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: bundle_data1.zip

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Major
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: test_res_1.png

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463199#comment-17463199
 ] 

Bruce Wong edited comment on FLINK-25330 at 12/21/21, 12:14 PM:


Hi [~jingge] ,

The bundle_data.zip is the test data, it contains data such as additions, 
updates, deletions. The Flink-SQL-Test.zip is my test code. The test_res.png is 
the test results.

And I test the code in mr. The test_res_1.png is the result I expected


was (Author: bruce wong):
Hi [~jingge] ,

The bundle_data.zip is the test data, it contains data such as additions, 
updates, deletions. The Flink-SQL-Test.zip is my test code. Here are the test 
results.

Please look at the test_res.png. 

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463199#comment-17463199
 ] 

Bruce Wong commented on FLINK-25330:


Hi [~jingge] ,

The bundle_data.zip is the test data, it contains data such as additions, 
updates, deletions. The Flink-SQL-Test.zip is my test code. Here are the test 
results.

Please look at the test_res.png. 

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: test_res.png

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: bundle_data.zip

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, 
> image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: (was: image-2021-12-21-20-04-47-780.png)

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, 
> image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: image-2021-12-21-20-04-47-780.png

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, image-2021-12-15-20-05-18-236.png, 
> image-2021-12-21-20-04-47-780.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-21 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: Flink-SQL-Test.zip

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-16 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461166#comment-17461166
 ] 

Bruce Wong commented on FLINK-25330:


Hi, [~jingge] 

As for HBase usage in the CDC scenario, I can prepare the Docker example later.
Also, I would like to know when using Flink and HBase would require removing 
the last version and keeping the previous one.

Thanks for your reply.

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-16 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460647#comment-17460647
 ] 

Bruce Wong commented on FLINK-25330:


Hi,[~jingge] 

Could you help me review the mr ? Thanks.

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460432#comment-17460432
 ] 

Bruce Wong edited comment on FLINK-25330 at 12/16/21, 6:11 AM:
---

Hi,  [~wenlong.lwl] 

In my opinion, if the user deletes data in mysql, even the old version of HBase 
data should not be retained if it is not, because it will cause incorrect 
semantics to join HBase data before and after HBase flush.


was (Author: bruce wong):
Hi, Wenlong Lyu

In my opinion, if the user deletes data in mysql, even the old version of HBase 
data should not be retained if it is not, because it will cause incorrect 
semantics to join HBase data before and after HBase flush.

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460432#comment-17460432
 ] 

Bruce Wong commented on FLINK-25330:


Hi, Wenlong Lyu

In my opinion, if the user deletes data in mysql, even the old version of HBase 
data should not be retained if it is not, because it will cause incorrect 
semantics to join HBase data before and after HBase flush.

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: (was: image-2021-12-15-20-00-22-767.png)

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-19-59-31-837.png|width=925,height=680!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Description: 
h2. Background

When we use CDC to synchronize mysql data to HBase, we find that HBase deletes 
only the last version of the specified rowkey when deleting mysql data. The 
data of the old version still exists. You end up using the wrong data. And I 
think its a bug of HBase connector.

The following figure shows Hbase data changes before and after mysql data is 
deleted.

!image-2021-12-15-20-05-18-236.png|width=910,height=669!

 
h2.  

  was:
h2. Background

When we use CDC to synchronize mysql data to HBase, we find that HBase deletes 
only the last version of the specified rowkey when deleting mysql data. The 
data of the old version still exists. You end up using the wrong data. And I 
think its a bug of HBase connector.

The following figure shows Hbase data changes before and after mysql data is 
deleted.

!image-2021-12-15-19-59-31-837.png|width=925,height=680!

 
h2.  


> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: image-2021-12-15-20-05-18-236.png

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25330:
---
Attachment: (was: image-2021-12-15-19-59-31-837.png)

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
> Attachments: image-2021-12-15-20-00-22-767.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-19-59-31-837.png|width=925,height=680!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Bruce Wong (Jira)
Bruce Wong created FLINK-25330:
--

 Summary: Flink SQL doesn't retract all versions of Hbase data
 Key: FLINK-25330
 URL: https://issues.apache.org/jira/browse/FLINK-25330
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: 1.14.0
Reporter: Bruce Wong
 Attachments: image-2021-12-15-20-00-22-767.png

h2. Background

When we use CDC to synchronize mysql data to HBase, we find that HBase deletes 
only the last version of the specified rowkey when deleting mysql data. The 
data of the old version still exists. You end up using the wrong data. And I 
think its a bug of HBase connector.

The following figure shows Hbase data changes before and after mysql data is 
deleted.

!image-2021-12-15-19-59-31-837.png|width=925,height=680!

 
h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)