[jira] [Commented] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17477531#comment-17477531 ] Bruce Wong commented on FLINK-25560: Hi, [~MartijnVisser] Is there anyone else who can help me to review this issue. Thanks. > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png, > image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, > image-2022-01-11-20-07-43-900.png, image-2022-01-11-20-09-29-074.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > "
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Description: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; public class KafkaToHBase { public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setBoolean(LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); //TableConfig config = tEnv.getConfig(); //config.setIdleStateRetention(Duration.ofHours(2)); String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" + " id STRING,\n" + " category_id STRING,\n" + " upc STRING,\n" + " `name`STRING,\n" + " price_cents STRING,\n" + " original_price_cents STRING,\n" + " short_descSTRING,\n" + " desc STRING,\n" + " cover_url STRING,\n" + " created_atSTRING,\n" + " updated_atSTRING,\n" + " deleted_atSTRING,\n" + " extra STRING,\n" + " statusSTRING,\n" + " scholarship_cents STRING,\n" + " is_paybackSTRING,\n" + " is_support_iapSTRING,\n" + " iap_product_idSTRING,\n" + " neo_product_code STRING,\n" + " paid_redirect_url STRING,\n" + " subscription_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dim-bundles',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'vvp_dev',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'value.debezium-json.schema-include' = 'true',\n" + " 'value.format' = 'debezium-json',\n" + " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + ")"; String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + "rowkey STRING,\n" + "cf ROW\n" + ") with (\n" + " 'connector'='hbase-2.2',\n" + " 'table-name'='dim_hbase',\n" + " 'zookeeper.quorum'='localhost:2181',\n" + " 'sink.buffer-flush.max-size' = '0',\n" + " 'sink.buffer-flush.max-rows' = '1',\n" + " 'sink.delete.mode' = 'all-versions'\n" + ")"; String dml = "INSERT INTO dim_hbase\n" + "SELECT \n" + " upc as rowkey,\n" + " ROW(\n" + "id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" + "FROM kafka_llspay_bundles"; tEnv.executeSql(source); tEnv.executeSql(sink); tEnv.executeSql(dml); } } {code} h1. Test # After the test, we found that the deleted CF was the CF specified in the Flink SQL DML statement, without affecting other CF. !image-2022-01-11-20-02-17-780.png|width=793,height=294! !image-2022-01-11-20-04-48-299.png|width=793,height=338! !image-2022-01-11-20-05-53-217.png|width=856,height=479! Two CFs to one CF. !image-2022-01-11-20-07-43-900.png|width=859,height=422! The data of cf2 is still there. So the deleted CF was the CF specified in the
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Attachment: image-2022-01-11-20-09-29-074.png > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png, > image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, > image-2022-01-11-20-07-43-900.png, image-2022-01-11-20-09-29-074.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + >
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Attachment: image-2022-01-11-20-07-43-900.png > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png, > image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, > image-2022-01-11-20-07-43-900.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > "
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Attachment: image-2022-01-11-20-05-53-217.png > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png, > image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + >
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Attachment: image-2022-01-11-20-04-48-299.png > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png, > image-2022-01-11-20-04-48-299.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + > "
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Description: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; public class KafkaToHBase { public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setBoolean(LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); //TableConfig config = tEnv.getConfig(); //config.setIdleStateRetention(Duration.ofHours(2)); String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" + " id STRING,\n" + " category_id STRING,\n" + " upc STRING,\n" + " `name`STRING,\n" + " price_cents STRING,\n" + " original_price_cents STRING,\n" + " short_descSTRING,\n" + " desc STRING,\n" + " cover_url STRING,\n" + " created_atSTRING,\n" + " updated_atSTRING,\n" + " deleted_atSTRING,\n" + " extra STRING,\n" + " statusSTRING,\n" + " scholarship_cents STRING,\n" + " is_paybackSTRING,\n" + " is_support_iapSTRING,\n" + " iap_product_idSTRING,\n" + " neo_product_code STRING,\n" + " paid_redirect_url STRING,\n" + " subscription_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dim-bundles',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'vvp_dev',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'value.debezium-json.schema-include' = 'true',\n" + " 'value.format' = 'debezium-json',\n" + " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + ")"; String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + "rowkey STRING,\n" + "cf ROW\n" + ") with (\n" + " 'connector'='hbase-2.2',\n" + " 'table-name'='dim_hbase',\n" + " 'zookeeper.quorum'='localhost:2181',\n" + " 'sink.buffer-flush.max-size' = '0',\n" + " 'sink.buffer-flush.max-rows' = '1',\n" + " 'sink.delete.mode' = 'all-versions'\n" + ")"; String dml = "INSERT INTO dim_hbase\n" + "SELECT \n" + " upc as rowkey,\n" + " ROW(\n" + "id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" + "FROM kafka_llspay_bundles"; tEnv.executeSql(source); tEnv.executeSql(sink); tEnv.executeSql(dml); } } {code} h1. Test # After the test, we found that the deleted CF was the CF specified in the Flink SQL DML statement, without affecting other CF. !image-2022-01-11-20-02-17-780.png|width=793,height=294! h1. Reference Please look at the following task link. FLINK-25330 was: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Attachment: image-2022-01-11-20-02-17-780.png > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + > " 'sink.buffer-flush.max-size' = '0',\n" + > "
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Description: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; public class KafkaToHBase { public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setBoolean(LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); //TableConfig config = tEnv.getConfig(); //config.setIdleStateRetention(Duration.ofHours(2)); String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" + " id STRING,\n" + " category_id STRING,\n" + " upc STRING,\n" + " `name`STRING,\n" + " price_cents STRING,\n" + " original_price_cents STRING,\n" + " short_descSTRING,\n" + " desc STRING,\n" + " cover_url STRING,\n" + " created_atSTRING,\n" + " updated_atSTRING,\n" + " deleted_atSTRING,\n" + " extra STRING,\n" + " statusSTRING,\n" + " scholarship_cents STRING,\n" + " is_paybackSTRING,\n" + " is_support_iapSTRING,\n" + " iap_product_idSTRING,\n" + " neo_product_code STRING,\n" + " paid_redirect_url STRING,\n" + " subscription_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dim-bundles',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'vvp_dev',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'value.debezium-json.schema-include' = 'true',\n" + " 'value.format' = 'debezium-json',\n" + " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + ")"; String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + "rowkey STRING,\n" + "cf ROW\n" + ") with (\n" + " 'connector'='hbase-2.2',\n" + " 'table-name'='dim_hbase',\n" + " 'zookeeper.quorum'='localhost:2181',\n" + " 'sink.buffer-flush.max-size' = '0',\n" + " 'sink.buffer-flush.max-rows' = '1',\n" + " 'sink.delete.mode' = 'all-versions'\n" + ")"; String dml = "INSERT INTO dim_hbase\n" + "SELECT \n" + " upc as rowkey,\n" + " ROW(\n" + "id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" + "FROM kafka_llspay_bundles"; tEnv.executeSql(source); tEnv.executeSql(sink); tEnv.executeSql(dml); } } {code} h1. Test h1. Reference Please look at the following task link. FLINK-25330 was: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import
[jira] [Comment Edited] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17471650#comment-17471650 ] Bruce Wong edited comment on FLINK-25330 at 1/10/22, 3:19 AM: -- Hi [~jingge] , I created a new task to solve this problem. Please take a look at that. FLINK-25560 Thanks. was (Author: bruce wong): Hi [~jingge] , I created a new task to solve this problem. Please take a look at that. [FLINK-25560|https://issues.apache.org/jira/browse/FLINK-25560] > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, > image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17471650#comment-17471650 ] Bruce Wong commented on FLINK-25330: Hi [~jingge] , I created a new task to solve this problem. Please take a look at that. [FLINK-25560|https://issues.apache.org/jira/browse/FLINK-25560] > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, > image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Component/s: Connectors / HBase > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase >Affects Versions: 1.15.0 >Reporter: Bruce Wong >Priority: Major > Labels: pull-request-available > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + > " 'sink.buffer-flush.max-size' = '0',\n" + > " 'sink.buffer-flush.max-rows' = '1',\n" + > "
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Affects Version/s: 1.15.0 > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.15.0 >Reporter: Bruce Wong >Priority: Major > Labels: pull-request-available > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > //TableConfig config = tEnv.getConfig(); > //config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name`STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_descSTRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_atSTRING,\n" + > " updated_atSTRING,\n" + > " deleted_atSTRING,\n" + > " extra STRING,\n" + > " statusSTRING,\n" + > " scholarship_cents STRING,\n" + > " is_paybackSTRING,\n" + > " is_support_iapSTRING,\n" + > " iap_product_idSTRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > "rowkey STRING,\n" + > "cf ROW STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + > " 'sink.buffer-flush.max-size' = '0',\n" + > " 'sink.buffer-flush.max-rows' = '1',\n" + > " 'sink.delete.mode' = 'all-versions'\n" + > ")";
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Description: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; public class KafkaToHBase { public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setBoolean(LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); //TableConfig config = tEnv.getConfig(); //config.setIdleStateRetention(Duration.ofHours(2)); String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" + " id STRING,\n" + " category_id STRING,\n" + " upc STRING,\n" + " `name`STRING,\n" + " price_cents STRING,\n" + " original_price_cents STRING,\n" + " short_descSTRING,\n" + " desc STRING,\n" + " cover_url STRING,\n" + " created_atSTRING,\n" + " updated_atSTRING,\n" + " deleted_atSTRING,\n" + " extra STRING,\n" + " statusSTRING,\n" + " scholarship_cents STRING,\n" + " is_paybackSTRING,\n" + " is_support_iapSTRING,\n" + " iap_product_idSTRING,\n" + " neo_product_code STRING,\n" + " paid_redirect_url STRING,\n" + " subscription_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dim-bundles',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'vvp_dev',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'value.debezium-json.schema-include' = 'true',\n" + " 'value.format' = 'debezium-json',\n" + " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + ")"; String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + "rowkey STRING,\n" + "cf ROW\n" + ") with (\n" + " 'connector'='hbase-2.2',\n" + " 'table-name'='dim_hbase',\n" + " 'zookeeper.quorum'='localhost:2181',\n" + " 'sink.buffer-flush.max-size' = '0',\n" + " 'sink.buffer-flush.max-rows' = '1',\n" + " 'sink.delete.mode' = 'all-versions'\n" + ")"; String dml = "INSERT INTO dim_hbase\n" + "SELECT \n" + " upc as rowkey,\n" + " ROW(\n" + "id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" + "FROM kafka_llspay_bundles"; tEnv.executeSql(source); tEnv.executeSql(sink); tEnv.executeSql(dml); } } {code} h1. Reference Please look at the following task link. FLINK-25330 was: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration;
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Description: h1. Motivation When we synchronize data from mysql to HBase, we find that when deleting data from mysql, HBase cannot delete all versions, which leads to incorrect semantics. So we want to add a parameter to control deleting the latest version or deleting all versions. h1. Usage The test code is as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; public class KafkaToHBase { public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setBoolean(LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); //TableConfig config = tEnv.getConfig(); //config.setIdleStateRetention(Duration.ofHours(2)); String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" + " id STRING,\n" + " category_id STRING,\n" + " upc STRING,\n" + " `name`STRING,\n" + " price_cents STRING,\n" + " original_price_cents STRING,\n" + " short_descSTRING,\n" + " desc STRING,\n" + " cover_url STRING,\n" + " created_atSTRING,\n" + " updated_atSTRING,\n" + " deleted_atSTRING,\n" + " extra STRING,\n" + " statusSTRING,\n" + " scholarship_cents STRING,\n" + " is_paybackSTRING,\n" + " is_support_iapSTRING,\n" + " iap_product_idSTRING,\n" + " neo_product_code STRING,\n" + " paid_redirect_url STRING,\n" + " subscription_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dim-bundles',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'vvp_dev',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'value.debezium-json.schema-include' = 'true',\n" + " 'value.format' = 'debezium-json',\n" + " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + ")"; String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + "rowkey STRING,\n" + "cf ROW\n" + ") with (\n" + " 'connector'='hbase-2.2',\n" + " 'table-name'='dim_hbase',\n" + " 'zookeeper.quorum'='localhost:2181',\n" + " 'sink.buffer-flush.max-size' = '0',\n" + " 'sink.buffer-flush.max-rows' = '1',\n" + " 'sink.delete.mode' = 'all-versions'\n" + ")"; String dml = "INSERT INTO dim_hbase\n" + "SELECT \n" + " upc as rowkey,\n" + " ROW(\n" + "id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" + "FROM kafka_llspay_bundles"; tEnv.executeSql(source); tEnv.executeSql(sink); tEnv.executeSql(dml); } } {code} h1. Reference Please look at the following task link. https://issues.apache.org/jira/browse/FLINK-25330 [FLINK-25330|https://issues.apache.org/jira/browse/FLINK-25330] > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL:
[jira] [Updated] (FLINK-25560) Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode
[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25560: --- Summary: Add "sink.delete.mode" in HBase sql connector for retracting the latest version or all versions in changelog mode (was: Add ) > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > - > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement >Reporter: Bruce Wong >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25560) Add
Bruce Wong created FLINK-25560: -- Summary: Add Key: FLINK-25560 URL: https://issues.apache.org/jira/browse/FLINK-25560 Project: Flink Issue Type: Improvement Reporter: Bruce Wong -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17464946#comment-17464946 ] Bruce Wong commented on FLINK-25330: Hi, [~jingge] Thanks for your reply. I updated the test data (bundle_data1.zip), and the test result is test_res2.png. The test results are the same as before. The test code and configuration are as follows. {code:java} package com.bruce; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; public class KafkaToHBase { public static void main(String[] args) { Configuration cfg = new Configuration(); cfg.setBoolean(LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); env.setParallelism(1); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); TableConfig config = tEnv.getConfig(); //config.setIdleStateRetention(Duration.ofHours(2)); String source = "CREATE TEMPORARY TABLE IF NOT EXISTS kafka_llspay_bundles(\n" + " id STRING,\n" + " category_id STRING,\n" + " upc STRING,\n" + " `name`STRING,\n" + " price_cents STRING,\n" + " original_price_cents STRING,\n" + " short_descSTRING,\n" + " desc STRING,\n" + " cover_url STRING,\n" + " created_atSTRING,\n" + " updated_atSTRING,\n" + " deleted_atSTRING,\n" + " extra STRING,\n" + " statusSTRING,\n" + " scholarship_cents STRING,\n" + " is_paybackSTRING,\n" + " is_support_iapSTRING,\n" + " iap_product_idSTRING,\n" + " neo_product_code STRING,\n" + " paid_redirect_url STRING,\n" + " subscription_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dim-bundles',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'vvp_dev',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'value.debezium-json.schema-include' = 'true',\n" + " 'value.format' = 'debezium-json',\n" + " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + ")"; String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + "rowkey STRING,\n" + "cf ROW\n" + ") with (\n" + " 'connector'='hbase-2.2',\n" + " 'table-name'='dim_hbase',\n" + " 'zookeeper.quorum'='localhost:2181',\n" + " 'sink.buffer-flush.max-size' = '0',\n" + " 'sink.buffer-flush.max-rows' = '1'\n" + ")"; String dml = "INSERT INTO dim_hbase\n" + "SELECT \n" + " upc as rowkey,\n" + " ROW(\n" + "id, category_id, upc, `name`, price_cents, original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" + "FROM kafka_llspay_bundles"; tEnv.executeSql(source); tEnv.executeSql(sink); tEnv.executeSql(dml); } } {code} > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, > image-2021-12-15-20-05-18-236.png,
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: test_res2.png > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, > image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: bundle_data1.zip > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Major > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, > image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: test_res_1.png > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, > image-2021-12-15-20-05-18-236.png, test_res.png, test_res_1.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463199#comment-17463199 ] Bruce Wong edited comment on FLINK-25330 at 12/21/21, 12:14 PM: Hi [~jingge] , The bundle_data.zip is the test data, it contains data such as additions, updates, deletions. The Flink-SQL-Test.zip is my test code. The test_res.png is the test results. And I test the code in mr. The test_res_1.png is the result I expected was (Author: bruce wong): Hi [~jingge] , The bundle_data.zip is the test data, it contains data such as additions, updates, deletions. The Flink-SQL-Test.zip is my test code. Here are the test results. Please look at the test_res.png. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, > image-2021-12-15-20-05-18-236.png, test_res.png, test_res_1.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463199#comment-17463199 ] Bruce Wong commented on FLINK-25330: Hi [~jingge] , The bundle_data.zip is the test data, it contains data such as additions, updates, deletions. The Flink-SQL-Test.zip is my test code. Here are the test results. Please look at the test_res.png. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, > image-2021-12-15-20-05-18-236.png, test_res.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: test_res.png > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, > image-2021-12-15-20-05-18-236.png, test_res.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: bundle_data.zip > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, > image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: (was: image-2021-12-21-20-04-47-780.png) > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, bundle_data.zip, > image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: image-2021-12-21-20-04-47-780.png > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, image-2021-12-15-20-05-18-236.png, > image-2021-12-21-20-04-47-780.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: Flink-SQL-Test.zip > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: Flink-SQL-Test.zip, image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17461166#comment-17461166 ] Bruce Wong commented on FLINK-25330: Hi, [~jingge] As for HBase usage in the CDC scenario, I can prepare the Docker example later. Also, I would like to know when using Flink and HBase would require removing the last version and keeping the previous one. Thanks for your reply. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460647#comment-17460647 ] Bruce Wong commented on FLINK-25330: Hi,[~jingge] Could you help me review the mr ? Thanks. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Assignee: Jing Ge >Priority: Critical > Labels: pull-request-available > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460432#comment-17460432 ] Bruce Wong edited comment on FLINK-25330 at 12/16/21, 6:11 AM: --- Hi, [~wenlong.lwl] In my opinion, if the user deletes data in mysql, even the old version of HBase data should not be retained if it is not, because it will cause incorrect semantics to join HBase data before and after HBase flush. was (Author: bruce wong): Hi, Wenlong Lyu In my opinion, if the user deletes data in mysql, even the old version of HBase data should not be retained if it is not, because it will cause incorrect semantics to join HBase data before and after HBase flush. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Priority: Critical > Labels: pull-request-available > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460432#comment-17460432 ] Bruce Wong commented on FLINK-25330: Hi, Wenlong Lyu In my opinion, if the user deletes data in mysql, even the old version of HBase data should not be retained if it is not, because it will cause incorrect semantics to join HBase data before and after HBase flush. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Priority: Critical > Labels: pull-request-available > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: (was: image-2021-12-15-20-00-22-767.png) > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Priority: Critical > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-19-59-31-837.png|width=925,height=680! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Description: h2. Background When we use CDC to synchronize mysql data to HBase, we find that HBase deletes only the last version of the specified rowkey when deleting mysql data. The data of the old version still exists. You end up using the wrong data. And I think its a bug of HBase connector. The following figure shows Hbase data changes before and after mysql data is deleted. !image-2021-12-15-20-05-18-236.png|width=910,height=669! h2. was: h2. Background When we use CDC to synchronize mysql data to HBase, we find that HBase deletes only the last version of the specified rowkey when deleting mysql data. The data of the old version still exists. You end up using the wrong data. And I think its a bug of HBase connector. The following figure shows Hbase data changes before and after mysql data is deleted. !image-2021-12-15-19-59-31-837.png|width=925,height=680! h2. > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Priority: Critical > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: image-2021-12-15-20-05-18-236.png > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Priority: Critical > Attachments: image-2021-12-15-20-05-18-236.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-20-05-18-236.png|width=910,height=669! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
[ https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Wong updated FLINK-25330: --- Attachment: (was: image-2021-12-15-19-59-31-837.png) > Flink SQL doesn't retract all versions of Hbase data > > > Key: FLINK-25330 > URL: https://issues.apache.org/jira/browse/FLINK-25330 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase >Affects Versions: 1.14.0 >Reporter: Bruce Wong >Priority: Critical > Attachments: image-2021-12-15-20-00-22-767.png > > > h2. Background > When we use CDC to synchronize mysql data to HBase, we find that HBase > deletes only the last version of the specified rowkey when deleting mysql > data. The data of the old version still exists. You end up using the wrong > data. And I think its a bug of HBase connector. > The following figure shows Hbase data changes before and after mysql data is > deleted. > !image-2021-12-15-19-59-31-837.png|width=925,height=680! > > h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data
Bruce Wong created FLINK-25330: -- Summary: Flink SQL doesn't retract all versions of Hbase data Key: FLINK-25330 URL: https://issues.apache.org/jira/browse/FLINK-25330 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: 1.14.0 Reporter: Bruce Wong Attachments: image-2021-12-15-20-00-22-767.png h2. Background When we use CDC to synchronize mysql data to HBase, we find that HBase deletes only the last version of the specified rowkey when deleting mysql data. The data of the old version still exists. You end up using the wrong data. And I think its a bug of HBase connector. The following figure shows Hbase data changes before and after mysql data is deleted. !image-2021-12-15-19-59-31-837.png|width=925,height=680! h2. -- This message was sent by Atlassian Jira (v8.20.1#820001)