[ https://issues.apache.org/jira/browse/FLINK-35689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dalongliu updated FLINK-35689: ------------------------------ Attachment: screenshot-1.png > Release Testing: Verify FLIP-435 & FLIP-448: Introduce a New Materialized > Table for Simplifying Data Pipelines > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-35689 > URL: https://issues.apache.org/jira/browse/FLINK-35689 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API > Reporter: dalongliu > Assignee: dalongliu > Priority: Blocker > Labels: release-testing > Fix For: 1.20.0 > > Attachments: screenshot-1.png > > > Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187, > https://issues.apache.org/jira/browse/FLINK-35345 > Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end > process, so the Release testing is an overall test of FLIP-435 & FLIP-448 > feature at the same time. > Since Materialized Table depends on CatalogStore, Catalog, Workflow > Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the > whole process, the validation process consists of two parts: Environment > Setup and Feature Verification. > h1. Environment Setup: > h1. > 1. create the File CatalogStore directory > 2. Create the test-filesystem Catalog and put > flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory. > 3. Create the Savepoint directory. > 4. Configure the Flink config.yaml file. > {code:yaml} > #============================================================================== > # Common > #============================================================================== > jobmanager: > bind-host: localhost > rpc: > address: localhost > # The RPC port where the JobManager is reachable. > port: 6123 > memory: > process: > size: 1600m > execution: > failover-strategy: region > taskmanager: > bind-host: localhost > host: localhost > # The number of task slots that each TaskManager offers. Each slot runs one > parallel pipeline. > numberOfTaskSlots: 3 > memory: > process: > size: 1728m > parallelism: > # The parallelism used for programs that did not specify and other > parallelism. > default: 1 > #============================================================================== > # Rest & web frontend > #============================================================================== > rest: > # The address to which the REST client will connect to > address: localhost > bind-address: localhost > # Catalog Store > table: > catalog-store: > kind: file > file: > path: xxx > # Embedded Scheduler config > workflow-scheduler: > type: embedded > # SQL Gateway address > sql-gateway: > endpoint: > rest: > address: 127.0.0.1 > {code} > 5. Start the Standalone cluster: . /bin/start-cluster.sh > 6. Start the SQL Gateway: . /bin/sql-gateway.sh > 7. Start SQL Client: /bin/sql-client.sh gateway --endpoint > http://127.0.0.1:8083 > 8. Register the test-filesystem Catalog > {code:sql} > CREATE CATALOG mt_cat > WITH ( > 'type' = 'test-filesystem', > 'path' = '...', > 'default-database' = 'mydb' > ); > USE CATALOG mt_cat; > {code} > 9. Create the test-filesystem source table and insert the data > {code:sql} > -- 1. create json format table > CREATE TABLE json_source ( > order_id BIGINT, > user_id BIGINT, > user_name STRING, > order_created_at STRING, > payment_amount_cents BIGINT > ) WITH ( > 'format' = 'json', > 'source.monitor-interval' = '5S' > ); > -- 2. insert data > INSERT INTO mt_cat.mydb.json_source VALUES > (1001, 1, 'user1', '2024-06-24 10:00:00', 10), > (1002, 1, 'user2', '2024-06-24 10:01:00', 20), > (1003, 2, 'user3', '2024-06-24 10:02:00', 30), > (1004, 2, 'user4', '2024-06-24 10:03:00', 40), > (1005, 1, 'user1', '2024-06-25 10:00:00', 10), > (1006, 1, 'user2', '2024-06-25 10:01:00', 20), > (1007, 2, 'user3', '2024-06-25 10:02:00', 30), > (1008, 2, 'user4', '2024-06-25 10:03:00', 40); > INSERT INTO mt_cat.mydb.json_source VALUES > (1001, 1, 'user1', '2024-06-26 10:00:00', 10), > (1002, 1, 'user2', '2024-06-26 10:01:00', 20), > (1003, 2, 'user3', '2024-06-26 10:02:00', 30), > (1004, 2, 'user4', '2024-06-26 10:03:00', 40), > (1005, 1, 'user1', '2024-06-27 10:00:00', 10), > (1006, 1, 'user2', '2024-06-27 10:01:00', 20), > (1007, 2, 'user3', '2024-06-27 10:02:00', 30), > (1008, 2, 'user4', '2024-06-27 10:03:00', 40); > {code} > h1. Feature verification > h1. > h2. Continuous Mode > h2. > In Continuous Mode, Materialized Table runs a Flink streaming job to update > the data in real-time. Feature verify includes various scenarios such as > Create & Suspend & Resume & Drop. > 1. Create Materialized Table, including various bad cases and good cases, and > execute the following statement in the SQL Client > {code:sql} > CREATE MATERIALIZED TABLE continuous_users_shops > ( > PRIMARY KEY(id) NOT ENFORCED > ) > WITH( > 'format' = 'debezium-json' > ) > FRESHNESS = INTERVAL '30' SECOND > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS pv > FROM ( > SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, > payment_amount_cents FROM json_source ) AS tmp > GROUP BY (user_id, ds); > {code} > 2. Suspend Materialized Table and execute the following statement in the SQL > Client > {code:sql} > ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND; > {code} > 3. Resume Materialized Table > {code:sql} > ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME; > {code} > 4. Manual Refresh Materialized Table > {code:sql} > ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH > PARTITION(ds = '2024-06-25'); > {code} > 5. Drop Materialized Table > {code:sql} > DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops; > {code} > h2. Full Mode > h2. > In Full Mode, Materialized Table needs to rely on Workflow Scheduler to > complete the periodic full refresh operation, so the main purpose is to > verify the FLIP-448 function. > 1. Create Materialized Table, verify various good and bad cases, and execute > the following statement > {code:sql} > CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops > PARTITIONED BY (ds) > WITH( > 'format' = 'json' > ) > FRESHNESS = INTERVAL '1' MINUTE > REFRESH_MODE = FULL > AS SELECT > user_id, > ds, > SUM (payment_amount_cents) AS payed_buy_fee_sum, > SUM (1) AS pv > FROM ( > SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, > payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp > GROUP BY (user_id, ds); > {code} > 2. Suspend Materialized Table by executing the following statement > {code:sql} > ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND; > {code} > 3. Resume Materialized Table and execute the following statement > {code:sql} > ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME; > {code} > 4. Drop Materialized Table and execute the following statement > {code:sql} > DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops; > DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops; > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)