[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758339#comment-17758339
 ] 

Hang Ruan edited comment on FLINK-32798 at 8/24/23 4:00 AM:
------------------------------------------------------------

Hi, all. I have done some tests about this feature.

1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
    @Override
    public CatalogModificationListener createListener(Context context) {
        return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000",
 "username", "password");
    }

    @Override
    public String factoryIdentifier() {
        return "test";
    }
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
    private final String jdbcUrl;
    private final String username;
    private final String password;

    public MyCatalogListener(String jdbcUrl, String username, String password) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
    }

    @Override
    public void onEvent(CatalogModificationEvent event) {
        try {
            Class.forName(getDriverClassName());

            try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
                 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
                String identifier;
                String type;
                String detail;
                String catalog = event.context().getCatalogName();
                if (event instanceof CreateDatabaseEvent) {
                    CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
                    identifier = "DB:" + cde.databaseName();
                    detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
                    type = "CREATE DB";
                } else if (event instanceof AlterDatabaseEvent) {
                    AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
                    identifier = "DB:" + ade.databaseName();
                    detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
                    type = "ALTER DB";
                } else if (event instanceof DropDatabaseEvent) {
                    DropDatabaseEvent dde = (DropDatabaseEvent) event;
                    identifier = "DB:" + dde.databaseName();
                    detail = "null";
                    type = "DELETE DB";
                } else if (event instanceof CreateTableEvent) {
                    CreateTableEvent cte = (CreateTableEvent) event;
                    identifier = "TBL:" + cte.identifier().toString();
                    detail = cte.table().toString();
                    type = "CREATE TBL";
                } else if (event instanceof AlterTableEvent) {
                    AlterTableEvent ate = (AlterTableEvent) event;
                    identifier = "TBL:" + ate.identifier().toString();
                    detail = ate.newTable().toString();
                    type = "ALTER TBL";
                } else if (event instanceof DropTableEvent) {
                    DropTableEvent dte = (DropTableEvent) event;
                    identifier = "TBL:" + dte.identifier().toString();
                    detail = dte.table().toString();
                    type = "DELETE TBL";
                } else {
                    throw new IllegalArgumentException("Unknown event type.");
                }
                statement.setObject(1, catalog);
                statement.setObject(2, identifier);
                statement.setObject(3, type);
                statement.setObject(4, detail);
                statement.execute();
            }
        } catch (Exception e) {
            throw new IllegalArgumentException(e);
        }
    }

    private String getDriverClassName() {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            return "com.mysql.cj.jdbc.Driver";
        } catch (ClassNotFoundException e) {
            return "com.mysql.jdbc.Driver";
        }
    }
} {code}
3.Add file `org.apache.flink.table.factories.Factory` to the resources.

4.Package the code and put it in `lib`.

5.Add `table.catalog-modification.listeners: test` to `flink-conf.yaml`.

6.Start sql client and test. The test result as follows.

!sqls.png!

!result.png!

It seems good when using by the sql client. And I think we should add the Step 
3 and 4 to docs.

Besides that, I think we should describe how to provide parameters for the 
listener in docs.


was (Author: ruanhang1993):
Hi, all. I have done some tests about this feature.

1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
    @Override
    public CatalogModificationListener createListener(Context context) {
        return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false&connectTimeout=30000",
 "username", "password");
    }

    @Override
    public String factoryIdentifier() {
        return "test";
    }
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
    private final String jdbcUrl;
    private final String username;
    private final String password;

    public MyCatalogListener(String jdbcUrl, String username, String password) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
    }

    @Override
    public void onEvent(CatalogModificationEvent event) {
        try {
            Class.forName(getDriverClassName());

            try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
                 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
                String identifier;
                String type;
                String detail;
                String catalog = event.context().getCatalogName();
                if (event instanceof CreateDatabaseEvent) {
                    CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
                    identifier = "DB:" + cde.databaseName();
                    detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
                    type = "CREATE DB";
                } else if (event instanceof AlterDatabaseEvent) {
                    AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
                    identifier = "DB:" + ade.databaseName();
                    detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
                    type = "ALTER DB";
                } else if (event instanceof DropDatabaseEvent) {
                    DropDatabaseEvent dde = (DropDatabaseEvent) event;
                    identifier = "DB:" + dde.databaseName();
                    detail = "null";
                    type = "DELETE DB";
                } else if (event instanceof CreateTableEvent) {
                    CreateTableEvent cte = (CreateTableEvent) event;
                    identifier = "TBL:" + cte.identifier().toString();
                    detail = cte.table().toString();
                    type = "CREATE TBL";
                } else if (event instanceof AlterTableEvent) {
                    AlterTableEvent ate = (AlterTableEvent) event;
                    identifier = "TBL:" + ate.identifier().toString();
                    detail = ate.newTable().toString();
                    type = "ALTER TBL";
                } else if (event instanceof DropTableEvent) {
                    DropTableEvent dte = (DropTableEvent) event;
                    identifier = "TBL:" + dte.identifier().toString();
                    detail = dte.table().toString();
                    type = "DELETE TBL";
                } else {
                    throw new IllegalArgumentException("Unknown event type.");
                }
                statement.setObject(1, catalog);
                statement.setObject(2, identifier);
                statement.setObject(3, type);
                statement.setObject(4, detail);
                statement.execute();
            }
        } catch (Exception e) {
            throw new IllegalArgumentException(e);
        }
    }

    private String getDriverClassName() {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            return "com.mysql.cj.jdbc.Driver";
        } catch (ClassNotFoundException e) {
            return "com.mysql.jdbc.Driver";
        }
    }
} {code}
3.Add file `org.apache.flink.table.factories.Factory` to the resources.

4.Package the code and put it in `lib`.

5.Add `table.catalog-modification.listeners: test` to `flink-conf.yaml`.

6.Start sql client and test. The test result as follows.

!sqls.png!

!result.png!

It seems good when using by the sql client. And I think we should add the Step 
3 and 4 to docs.

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-32798
>                 URL: https://issues.apache.org/jira/browse/FLINK-32798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.18.0
>            Reporter: Qingsheng Ren
>            Priority: Major
>             Fix For: 1.18.0
>
>         Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to