[ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:
--------------------------------
    Description: 
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, I feel there is a need for operation validation to 
ensure only safe operations are executed and unsafe operations are dropped.
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

public interface OperationValidator extends Plugin {
    /**
     * Validate a given operation and return an optional error string.
     *
     * @param op the operation to be validated.
     * @return Optional error string, should be present only if validation 
resulted in an error.
     */
    Optional<String> validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTableValidator implements OperationValidator {
    private static final long ONE_DAY = 24 * 60 * 60 * 1000L;

    @Override
    public Optional<String> validate(Operation op) {
        if (op instanceof CreateTableOperation) {
            final CreateTableOperation createTableOp = (CreateTableOperation) 
op;
            final String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
            if ("kafka".equals(connector)) {
                try {
                    final long startupTimestampValue =
                            Long.parseLong(
                                    createTableOp
                                            .getCatalogTable()
                                            .getOptions()
                                            
.get("scan.startup.timestamp-millis"));
                    if (startupTimestampValue < System.currentTimeMillis() - 
ONE_DAY) {
                        return Optional.of(
                                String.format(
                                        "Validation failed in %s: 
'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within 
one day from the current time.",
                                        KafkaTableValidator.class.getName(),
                                        startupTimestampValue));
                    }
                } catch (NumberFormatException e) {
                    return Optional.empty();
                }
            }
        }
        return Optional.empty();
    }
}{code}

I'm looking forward to get feedback from the community. 
Thanks

  was:
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, there is need for query operation validation to 
only execute safe queries and drop unsafe queries. 
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

public interface OperationValidator extends Plugin {
    /**
     * Validate a given operation and return an optional error string.
     *
     * @param op the operation to be validated.
     * @return Optional error string, should be present only if validation 
resulted in an error.
     */
    Optional<String> validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTableValidator implements OperationValidator {
    private static final long ONE_DAY = 24 * 60 * 60 * 1000L;

    @Override
    public Optional<String> validate(Operation op) {
        if (op instanceof CreateTableOperation) {
            final CreateTableOperation createTableOp = (CreateTableOperation) 
op;
            final String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
            if ("kafka".equals(connector)) {
                try {
                    final long startupTimestampValue =
                            Long.parseLong(
                                    createTableOp
                                            .getCatalogTable()
                                            .getOptions()
                                            
.get("scan.startup.timestamp-millis"));
                    if (startupTimestampValue < System.currentTimeMillis() - 
ONE_DAY) {
                        return Optional.of(
                                String.format(
                                        "Validation failed in %s: 
'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within 
one day from the current time.",
                                        KafkaTableValidator.class.getName(),
                                        startupTimestampValue));
                    }
                } catch (NumberFormatException e) {
                    return Optional.empty();
                }
            }
        }
        return Optional.empty();
    }
}{code}

I'm looking forward to get feedback from the community. 
Thanks


> Support custom query operation validator for sql gateway
> --------------------------------------------------------
>
>                 Key: FLINK-35560
>                 URL: https://issues.apache.org/jira/browse/FLINK-35560
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Gateway
>            Reporter: dongwoo.kim
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> Hello I'd like to suggest custom pluggable query operation validator support 
> in flink sql gateway.
> As an sql gateway operator, I feel there is a need for operation validation 
> to ensure only safe operations are executed and unsafe operations are dropped.
> To address this need, I propose adding a {{OperationValidator}} interface in 
> flink sql gateway api package. 
> This interface will allow users to implement their own query operation 
> validation logic, providing benefits to flink sql gateway operators.
> h3. Interface
> Below is a draft for the interface.
> It takes Operation and check whether the query is valid or not.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.annotation.Public;
> import org.apache.flink.table.operations.Operation;
> public interface OperationValidator extends Plugin {
>     /**
>      * Validate a given operation and return an optional error string.
>      *
>      * @param op the operation to be validated.
>      * @return Optional error string, should be present only if validation 
> resulted in an error.
>      */
>     Optional<String> validate(Operation op);
> }
> {code}
> h3. Example implementation
> Below is an example implementation that inspects Kafka table options, 
> specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
> is too small, which can cause high disk I/O load.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.table.gateway.api.validator.OperationValidator;
> import org.apache.flink.table.operations.Operation;
> import org.apache.flink.table.operations.ddl.CreateTableOperation;
> public class KafkaTableValidator implements OperationValidator {
>     private static final long ONE_DAY = 24 * 60 * 60 * 1000L;
>     @Override
>     public Optional<String> validate(Operation op) {
>         if (op instanceof CreateTableOperation) {
>             final CreateTableOperation createTableOp = (CreateTableOperation) 
> op;
>             final String connector = 
> createTableOp.getCatalogTable().getOptions().get("connector");
>             if ("kafka".equals(connector)) {
>                 try {
>                     final long startupTimestampValue =
>                             Long.parseLong(
>                                     createTableOp
>                                             .getCatalogTable()
>                                             .getOptions()
>                                             
> .get("scan.startup.timestamp-millis"));
>                     if (startupTimestampValue < System.currentTimeMillis() - 
> ONE_DAY) {
>                         return Optional.of(
>                                 String.format(
>                                         "Validation failed in %s: 
> 'scan.startup.timestamp-millis' is too old. Given value: %d. It must be 
> within one day from the current time.",
>                                         KafkaTableValidator.class.getName(),
>                                         startupTimestampValue));
>                     }
>                 } catch (NumberFormatException e) {
>                     return Optional.empty();
>                 }
>             }
>         }
>         return Optional.empty();
>     }
> }{code}
> I'm looking forward to get feedback from the community. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to