dalongliu created FLINK-35689:
---------------------------------

             Summary: CLONE - Release Testing: Verify FLIP-435 & FLIP-448: 
Introduce a New Materialized Table for Simplifying Data Pipelines
                 Key: FLINK-35689
                 URL: https://issues.apache.org/jira/browse/FLINK-35689
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / API
            Reporter: dalongliu
            Assignee: dalongliu
             Fix For: 1.20.0


Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187

Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end 
process, so the Release testing is an overall test of FLIP-435 & FLIP-448 
feature at the same time.
Since Materialized Table depends on CatalogStore, Catalog, Workflow Scheduler, 
SQL Client, SQL Gateway, and Standalone cluster to go through the whole 
process, the validation process consists of two parts: Environment Setup and 
Feature Verification.

h1. Environment Setup:
h1. 
1. create the File CatalogStore directory
2. Create the test-filesystem Catalog and put 
flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
3. Create the Savepoint directory.
4. Configure the Flink config.yaml file.

{code:yaml}
#==============================================================================
# Common
#==============================================================================

jobmanager:
  bind-host: localhost
  rpc:
    address: localhost
    # The RPC port where the JobManager is reachable.
    port: 6123
  memory:
    process:
      size: 1600m
  execution:
    failover-strategy: region

taskmanager:
  bind-host: localhost
  host: localhost
  # The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
  numberOfTaskSlots: 3
  memory:
    process:
      size: 1728m

parallelism:
  # The parallelism used for programs that did not specify and other 
parallelism.
  default: 1

#==============================================================================
# Rest & web frontend
#==============================================================================

rest:
  # The address to which the REST client will connect to
  address: localhost
  bind-address: localhost

# Catalog Store
table:
  catalog-store:
    kind: file
    file:
      path: xxx

# Embedded Scheduler config
workflow-scheduler:
  type: embedded

# SQL Gateway address
sql-gateway:
  endpoint:
    rest:
      address: 127.0.0.1
{code}

5. Start the Standalone cluster: . /bin/start-cluster.sh
6. Start the SQL Gateway: . /bin/sql-gateway.sh
7. Start SQL Client: /bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
8. Register the test-filesystem Catalog

{code:sql}
CREATE CATALOG mt_cat
WITH (
  'type' = 'test-filesystem',
  'path' = '...',
  'default-database' = 'mydb'  
);

USE CATALOG mt_cat;
{code}

9. Create the test-filesystem source table and insert the data

{code:sql}
-- 1. create json format table
CREATE TABLE json_source (
  order_id BIGINT,
  user_id BIGINT,
  user_name STRING,
  order_created_at STRING,
  payment_amount_cents BIGINT
) WITH (
  'format' = 'json',
  'source.monitor-interval' = '5S'
);

-- 2. insert data
INSERT INTO mt_cat.mydb.json_source VALUES
(1001, 1, 'user1', '2024-06-24 10:00:00', 10),
(1002, 1, 'user2', '2024-06-24 10:01:00', 20),
(1003, 2, 'user3', '2024-06-24 10:02:00', 30),
(1004, 2, 'user4', '2024-06-24 10:03:00', 40),
(1005, 1, 'user1', '2024-06-25 10:00:00', 10),
(1006, 1, 'user2', '2024-06-25 10:01:00', 20),
(1007, 2, 'user3', '2024-06-25 10:02:00', 30),
(1008, 2, 'user4', '2024-06-25 10:03:00', 40);

INSERT INTO mt_cat.mydb.json_source VALUES
(1001, 1, 'user1', '2024-06-26 10:00:00', 10),
(1002, 1, 'user2', '2024-06-26 10:01:00', 20),
(1003, 2, 'user3', '2024-06-26 10:02:00', 30),
(1004, 2, 'user4', '2024-06-26 10:03:00', 40),
(1005, 1, 'user1', '2024-06-27 10:00:00', 10),
(1006, 1, 'user2', '2024-06-27 10:01:00', 20),
(1007, 2, 'user3', '2024-06-27 10:02:00', 30),
(1008, 2, 'user4', '2024-06-27 10:03:00', 40);
{code}

h1. Feature verification
h1. 
h2. Continuous Mode
h2. 
In Continuous Mode, Materialized Table runs a Flink streaming job to update the 
data in real-time. Feature verify includes various scenarios such as Create & 
Suspend & Resume & Drop.

1. Create Materialized Table, including various bad cases and good cases, and 
execute the following statement in the SQL Client

{code:sql}
CREATE MATERIALIZED TABLE continuous_users_shops 
(
  PRIMARY KEY(id) NOT ENFORCED
)
WITH(
  'format' = 'debezium-json'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT 
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
FROM (
    SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
payment_amount_cents FROM json_source ) AS tmp
 GROUP BY (user_id, ds);
{code}

2. Suspend Materialized Table and execute the following statement in the SQL 
Client

{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
{code}

3. Resume Materialized Table

{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
{code}

4. Manual Refresh Materialized Table

{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH 
PARTITION(ds = '2024-06-25');
{code}

5. Drop Materialized Table

{code:sql}
DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
{code}

h2. Full Mode
h2. 
In Full Mode, Materialized Table needs to rely on Workflow Scheduler to 
complete the periodic full refresh operation, so the main purpose is to verify 
the FLIP-448 function.

1. Create Materialized Table, verify various good and bad cases, and execute 
the following statement

{code:sql}
CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
PARTITIONED BY (ds)
WITH(
  'format' = 'json'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT 
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
FROM (
    SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
GROUP BY (user_id, ds);
{code}

2. Suspend Materialized Table by executing the following statement

{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
{code}

3. Resume Materialized Table and execute the following statement

{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
{code}

4. Drop Materialized Table and execute the following statement

{code:sql}
DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;

DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to