dongwoo.kim created FLINK-35560:
-----------------------------------
Summary: Add query validator support to flink sql gateway via spi
pattern
Key: FLINK-35560
URL: https://issues.apache.org/jira/browse/FLINK-35560
Project: Flink
Issue Type: Improvement
Components: Table SQL / Gateway
Reporter: dongwoo.kim
h3. Summary
Hello I'd like to suggest query validator support in flink sql gateway via spi
pattern.
As an sql gateway operator, there is need for query validation to only execute
safe queries and drop unsafe queries.
To address this need, I propose adding a {{QueryValidator}} interface in flink
sql gateway api package.
This interface will allow users to implement their own query validation logic,
providing benefits to flink sql gateway operators.
h3. Interface
Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;
import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;
/**
* Interface for implementing a validator that checks the safety of executing
queries.
*/
@Public
public interface QueryValidator {
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation
Below is an example implementation that inspects Kafka table options,
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value
is too small, which can cause high disk I/O load.
{code:sql}
package org.apache.flink.table.gateway.api.validator;
import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
public class KafkaTimestampValidator implements QueryValidator {
private static final long ONE_DAY = 24 * 60 * 60 * 1000L;
@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector =
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp =
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null &&
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}
I'd be happy to implement this feature, if we can reach on agreement.
Thanks
h4.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)