[ 
https://issues.apache.org/jira/browse/FLINK-35609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu closed FLINK-35609.
-----------------------------
    Resolution: Fixed

> Release Testing Instructions: Verify FLIP-435: Introduce a New Materialized 
> Table for Simplifying Data Pipelines
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35609
>                 URL: https://issues.apache.org/jira/browse/FLINK-35609
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Rui Fan
>            Assignee: dalongliu
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.20.0
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187
> Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end 
> process, so the Release testing is an overall test of FLIP-435 & FLIP-448 
> feature at the same time.
> Since Materialized Table depends on CatalogStore, Catalog, Workflow 
> Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the 
> whole process, the validation process consists of two parts: Environment 
> Setup and Feature Verification.
> h1. Environment Setup:
> h1. 
> 1. create the File CatalogStore directory
> 2. Create the test-filesystem Catalog and put 
> flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
> 3. Create the Savepoint directory.
> 4. Configure the Flink config.yaml file.
> {code:yaml}
> #==============================================================================
> # Common
> #==============================================================================
> jobmanager:
>   bind-host: localhost
>   rpc:
>     address: localhost
>     # The RPC port where the JobManager is reachable.
>     port: 6123
>   memory:
>     process:
>       size: 1600m
>   execution:
>     failover-strategy: region
> taskmanager:
>   bind-host: localhost
>   host: localhost
>   # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
>   numberOfTaskSlots: 3
>   memory:
>     process:
>       size: 1728m
> parallelism:
>   # The parallelism used for programs that did not specify and other 
> parallelism.
>   default: 1
> #==============================================================================
> # Rest & web frontend
> #==============================================================================
> rest:
>   # The address to which the REST client will connect to
>   address: localhost
>   bind-address: localhost
> # Catalog Store
> table:
>   catalog-store:
>     kind: file
>     file:
>       path: xxx
> # Embedded Scheduler config
> workflow-scheduler:
>   type: embedded
> # SQL Gateway address
> sql-gateway:
>   endpoint:
>     rest:
>       address: 127.0.0.1
> {code}
> 5. Start the Standalone cluster: . /bin/start-cluster.sh
> 6. Start the SQL Gateway: . /bin/sql-gateway.sh
> 7. Start SQL Client: /bin/sql-client.sh gateway --endpoint 
> http://127.0.0.1:8083
> 8. Register the test-filesystem Catalog
> {code:sql}
> CREATE CATALOG mt_cat
> WITH (
>   'type' = 'test-filesystem',
>   'path' = '...',
>   'default-database' = 'mydb'  
> );
> USE CATALOG mt_cat;
> {code}
> 9. Create the test-filesystem source table and insert the data
> {code:sql}
> -- 1. create json format table
> CREATE TABLE json_source (
>   order_id BIGINT,
>   user_id BIGINT,
>   user_name STRING,
>   order_created_at STRING,
>   payment_amount_cents BIGINT
> ) WITH (
>   'format' = 'json',
>   'source.monitor-interval' = '5S'
> );
> -- 2. insert data
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-24 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-24 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-24 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-24 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-25 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-25 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-25 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-25 10:03:00', 40);
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-26 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-26 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-26 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-26 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-27 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-27 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-27 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
> {code}
> h1. Feature verification
> h1. 
> h2. Continuous Mode
> h2. 
> In Continuous Mode, Materialized Table runs a Flink streaming job to update 
> the data in real-time. Feature verify includes various scenarios such as 
> Create & Suspend & Resume & Drop.
> 1. Create Materialized Table, including various bad cases and good cases, and 
> execute the following statement in the SQL Client
> {code:sql}
> CREATE MATERIALIZED TABLE continuous_users_shops 
> (
>   PRIMARY KEY(id) NOT ENFORCED
> )
> WITH(
>   'format' = 'debezium-json'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT 
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
> FROM (
>     SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
> payment_amount_cents FROM json_source ) AS tmp
>  GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table and execute the following statement in the SQL 
> Client
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
> {code}
> 4. Manual Refresh Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH 
> PARTITION(ds = '2024-06-25');
> {code}
> 5. Drop Materialized Table
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
> {code}
> h2. Full Mode
> h2. 
> In Full Mode, Materialized Table needs to rely on Workflow Scheduler to 
> complete the periodic full refresh operation, so the main purpose is to 
> verify the FLIP-448 function.
> 1. Create Materialized Table, verify various good and bad cases, and execute 
> the following statement
> {code:sql}
> CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
> PARTITIONED BY (ds)
> WITH(
>   'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT 
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
> FROM (
>     SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
> payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table by executing the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table and execute the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
> {code}
> 4. Drop Materialized Table and execute the following statement
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
> DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to