[ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:
--------------------------------
    Summary: Support custom query operation validator for sql gateway  (was: 
Add query validator support to flink sql gateway via spi pattern)

> Support custom query operation validator for sql gateway
> --------------------------------------------------------
>
>                 Key: FLINK-35560
>                 URL: https://issues.apache.org/jira/browse/FLINK-35560
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Gateway
>            Reporter: dongwoo.kim
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> Hello I'd like to suggest query validator support in flink sql gateway via 
> spi pattern.
> As an sql gateway operator, there is need for query validation to only 
> execute safe queries and drop unsafe queries. 
> To address this need, I propose adding a {{QueryValidator}} interface in 
> flink sql gateway api package. 
> This interface will allow users to implement their own query validation 
> logic, providing benefits to flink sql gateway operators.
> h3. Interface
> Below is a draft for the interface.
> It takes Operation and check whether the query is valid or not.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.annotation.Public;
> import org.apache.flink.table.operations.Operation;
> /**
>  * Interface for implementing a validator that checks the safety of executing 
> queries.
>  */
> @Public
> public interface QueryValidator {     
>     boolean validateQuery(Operation op);
> }
> {code}
> h3. Example implementation
> Below is an example implementation that inspects Kafka table options, 
> specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
> is too small, which can cause high disk I/O load.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.table.gateway.api.validator.QueryValidator;
> import org.apache.flink.table.operations.Operation;
> import org.apache.flink.table.operations.ddl.CreateTableOperation;
> public class KafkaTimestampValidator implements QueryValidator {
>     private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 
>     @Override
>     public boolean validateQuery(Operation op) {
>         if (op instanceof CreateTableOperation) {
>             CreateTableOperation createTableOp = (CreateTableOperation) op;
>             String connector = 
> createTableOp.getCatalogTable().getOptions().get("connector");
>             if ("kafka".equals(connector)) {
>                 String startupTimestamp = 
> createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
>                 if (startupTimestamp != null && 
> Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
>                     return false;                
>                 }
>             }
>         }
>         return true;
>     }
> }{code}
> I'd be happy to implement this feature, if we can reach on agreement. 
> Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to