[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021808704


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##
@@ -84,6 +90,30 @@ public OperationExecutor(SessionContext context, 
Configuration executionConfig)
 this.executionConfig = executionConfig;
 }
 
+public ResultFetcher configureSession(OperationHandle handle, String 
statement) {
+TableEnvironmentInternal tableEnv = getTableEnvironment();
+List parsedOperations = 
tableEnv.getParser().parse(statement);
+if (parsedOperations.size() > 1) {
+throw new UnsupportedOperationException(
+"Unsupported SQL statement! Execute statement only accepts 
a single SQL statement or "
++ "multiple 'INSERT INTO' statements wrapped in a 
'STATEMENT SET' block.");
+}
+Operation op = parsedOperations.get(0);
+if (!(op instanceof SetOperation)
+&& !(op instanceof ResetOperation)
+&& !(op instanceof CreateOperation)
+&& !(op instanceof DropOperation)
+&& !(op instanceof UseOperation)
+&& !(op instanceof AlterOperation)
+&& !(op instanceof LoadModuleOperation)
+&& !(op instanceof UnloadModuleOperation)
+&& !(op instanceof AddJarOperation)) {

Review Comment:
   Should RemoveJarOperation be in the this list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021816197


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##
@@ -157,6 +169,80 @@ public void testOpenSessionWithEnvironment() throws 
Exception {
 assertThat(tableEnv.listModules()).contains(moduleName);
 }
 
+@Test
+public void testConfigureSessionWithLegalStatement(@TempDir 
java.nio.file.Path tmpDir)
+throws Exception {
+SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+// SET & RESET
+verifyConfigureSession(sessionHandle, "SET 'key1' = 'value1';");
+Map config = new HashMap<>();
+config.put("key1", "value1");
+
assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
+
+verifyConfigureSession(sessionHandle, "RESET 'key1';");
+
assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", 
"value1");
+
+// CREATE & USE & ALTER & DROP
+verifyConfigureSession(
+sessionHandle,
+"CREATE CATALOG mycat with ('type' = 'generic_in_memory', 
'default-database' = 'db');");
+
+verifyConfigureSession(sessionHandle, "USE CATALOG mycat;");
+
assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");

Review Comment:
   Is it worth checking the `type` and/or `default-database`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021817966


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##
@@ -157,6 +169,80 @@ public void testOpenSessionWithEnvironment() throws 
Exception {
 assertThat(tableEnv.listModules()).contains(moduleName);
 }
 
+@Test
+public void testConfigureSessionWithLegalStatement(@TempDir 
java.nio.file.Path tmpDir)
+throws Exception {
+SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+// SET & RESET
+verifyConfigureSession(sessionHandle, "SET 'key1' = 'value1';");
+Map config = new HashMap<>();
+config.put("key1", "value1");
+
assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
+
+verifyConfigureSession(sessionHandle, "RESET 'key1';");
+
assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", 
"value1");
+
+// CREATE & USE & ALTER & DROP
+verifyConfigureSession(
+sessionHandle,
+"CREATE CATALOG mycat with ('type' = 'generic_in_memory', 
'default-database' = 'db');");
+
+verifyConfigureSession(sessionHandle, "USE CATALOG mycat;");
+
assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");
+
+verifyConfigureSession(
+sessionHandle, "CREATE TABLE db.tbl (score INT) WITH 
('connector' = 'datagen');");
+
+Set tableKinds = new HashSet<>();
+tableKinds.add(TableKind.TABLE);
+assertThat(service.listTables(sessionHandle, "mycat", "db", 
tableKinds))
+.contains(
+new TableInfo(ObjectIdentifier.of("mycat", "db", 
"tbl"), TableKind.TABLE));
+
+verifyConfigureSession(sessionHandle, "ALTER TABLE db.tbl RENAME TO 
tbl1;");
+assertThat(service.listTables(sessionHandle, "mycat", "db", 
tableKinds))
+.doesNotContain(
+new TableInfo(ObjectIdentifier.of("mycat", "db", 
"tbl"), TableKind.TABLE))
+.contains(
+new TableInfo(ObjectIdentifier.of("mycat", "db", 
"tbl1"), TableKind.TABLE));
+
+service.configureSession(sessionHandle, "USE CATALOG 
default_catalog;", 0);
+verifyConfigureSession(sessionHandle, "DROP CATALOG mycat;");
+
assertThat(service.listCatalogs(sessionHandle)).doesNotContain("mycat");
+
+// LOAD & UNLOAD MODULE
+verifyConfigureSession(sessionHandle, "LOAD MODULE dummy;");
+
+TableEnvironmentInternal tableEnv =
+
service.getSession(sessionHandle).createExecutor().getTableEnvironment();
+assertThat(
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("SHOW FULL 
MODULES;").collect()))
+.contains(Row.of("dummy", true));
+
+verifyConfigureSession(sessionHandle, "UNLOAD MODULE dummy;");
+assertThat(
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("SHOW FULL 
MODULES;").collect()))
+.isEqualTo(Collections.singletonList(Row.of("core", true)));
+
+// ADD JAR
+String udfClassName = GENERATED_LOWER_UDF_CLASS + new 
Random().nextInt(50);
+String jarPath =
+UserClassLoaderJarTestUtils.createJarFile(
+new File(tmpDir.toUri()),
+"test-add-jar.jar",
+udfClassName,
+String.format(GENERATED_LOWER_UDF_CODE, 
udfClassName))
+.toURI()
+.toString();
+verifyConfigureSession(sessionHandle, String.format("ADD JAR '%s';", 
jarPath));
+
+assertThat(CollectionUtil.iteratorToList(tableEnv.executeSql("SHOW 
JARS;").collect()))
+.isEqualTo(Collections.singletonList(Row.of(new 
Path(jarPath).getPath(;

Review Comment:
   Assuming that `REMOVE JAR` is added above, it'd be make sense to remove the 
jar here.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021869458


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -311,8 +340,29 @@ public GatewayInfo getGatewayInfo() {
 return GatewayInfo.INSTANCE;
 }
 
+// 

+
 @VisibleForTesting
 public Session getSession(SessionHandle sessionHandle) {
 return sessionManager.getSession(sessionHandle);
 }
+
+/** Fetch all results for configuring session. */
+private ResultSet fetchConfigureSessionResult(
+SessionHandle sessionHandle, OperationHandle operationHandle) {
+ResultSet firstResult = fetchResults(sessionHandle, operationHandle, 
0, Integer.MAX_VALUE);
+while (firstResult == ResultSet.NOT_READY_RESULTS) {
+firstResult = fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE);

Review Comment:
   It seems like operations like AddJar could be synchronous and take some 
period of time, right? 
   
   If that's the case, this will be a busy wait on any synchronous operations 
which take time, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021870936


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -311,8 +340,29 @@ public GatewayInfo getGatewayInfo() {
 return GatewayInfo.INSTANCE;
 }
 
+// 

+
 @VisibleForTesting
 public Session getSession(SessionHandle sessionHandle) {
 return sessionManager.getSession(sessionHandle);
 }
+
+/** Fetch all results for configuring session. */
+private ResultSet fetchConfigureSessionResult(

Review Comment:
   It seems like this function is general and could be used to fetch the 
results from any `OperationHandle`, right?
   
   Should it be factor out or maybe named something more general?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021874117


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -79,6 +82,32 @@ public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException
 }
 }
 
+@Override
+public ResultSet configureSession(
+SessionHandle sessionHandle, String statement, long 
executionTimeoutMs)
+throws SqlGatewayException {
+try {
+if (executionTimeoutMs > 0) {
+// TODO: support the feature in FLINK-27838

Review Comment:
   As a small detail, FLINK-27838 says it is about a timeout for 
`executeStatement`.  Should the ticket be updated to note that 
`configureSession` also has a timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021876246


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -79,6 +82,32 @@ public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException
 }
 }
 
+@Override
+public ResultSet configureSession(
+SessionHandle sessionHandle, String statement, long 
executionTimeoutMs)
+throws SqlGatewayException {
+try {
+if (executionTimeoutMs > 0) {
+// TODO: support the feature in FLINK-27838
+throw new UnsupportedOperationException(
+"SqlGatewayService doesn't support timeout mechanism 
now.");
+}
+OperationHandle operationHandle =
+getSession(sessionHandle)
+.getOperationManager()
+.submitOperation(
+handle ->
+getSession(sessionHandle)
+.createExecutor()
+.configureSession(handle, 
statement));
+return fetchConfigureSessionResult(sessionHandle, operationHandle);

Review Comment:
   Are there any concurrency concerns here?
   
   I'm new to the codebase; seems like it ought to be ok, but figured I'd ask.
   
   To validate what I understand so far, I'm guessing that the SQL client will 
block on getting a response back.  Also, it looks like the 
`fetchConfigureSessionResult` will block on getting the result back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-14 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1021878225


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -79,6 +82,32 @@ public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException
 }
 }
 
+@Override
+public ResultSet configureSession(
+SessionHandle sessionHandle, String statement, long 
executionTimeoutMs)
+throws SqlGatewayException {
+try {
+if (executionTimeoutMs > 0) {
+// TODO: support the feature in FLINK-27838

Review Comment:
   As another thought, in the future, if a `configureSession` call *does 
timeout*, it seems like it could be challenging to manage rolling back the 
configuration change as new requests are coming in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-15 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1023336842


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##
@@ -84,6 +90,30 @@ public OperationExecutor(SessionContext context, 
Configuration executionConfig)
 this.executionConfig = executionConfig;
 }
 
+public ResultFetcher configureSession(OperationHandle handle, String 
statement) {
+TableEnvironmentInternal tableEnv = getTableEnvironment();
+List parsedOperations = 
tableEnv.getParser().parse(statement);
+if (parsedOperations.size() > 1) {
+throw new UnsupportedOperationException(
+"Unsupported SQL statement! Execute statement only accepts 
a single SQL statement or "
++ "multiple 'INSERT INTO' statements wrapped in a 
'STATEMENT SET' block.");
+}
+Operation op = parsedOperations.get(0);
+if (!(op instanceof SetOperation)
+&& !(op instanceof ResetOperation)
+&& !(op instanceof CreateOperation)
+&& !(op instanceof DropOperation)
+&& !(op instanceof UseOperation)
+&& !(op instanceof AlterOperation)
+&& !(op instanceof LoadModuleOperation)
+&& !(op instanceof UnloadModuleOperation)
+&& !(op instanceof AddJarOperation)) {

Review Comment:
   Ok.  That makes some sense; I may need to learn more about what the jars are 
doing here to understand it better.  
   
   As long as leaving out RemoveJar is a thoughtful decision, I'm good with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-15 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1023336995


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##
@@ -157,6 +169,80 @@ public void testOpenSessionWithEnvironment() throws 
Exception {
 assertThat(tableEnv.listModules()).contains(moduleName);
 }
 
+@Test
+public void testConfigureSessionWithLegalStatement(@TempDir 
java.nio.file.Path tmpDir)
+throws Exception {
+SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+// SET & RESET
+verifyConfigureSession(sessionHandle, "SET 'key1' = 'value1';");
+Map config = new HashMap<>();
+config.put("key1", "value1");
+
assertThat(service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
+
+verifyConfigureSession(sessionHandle, "RESET 'key1';");
+
assertThat(service.getSessionConfig(sessionHandle)).doesNotContainEntry("key1", 
"value1");
+
+// CREATE & USE & ALTER & DROP
+verifyConfigureSession(
+sessionHandle,
+"CREATE CATALOG mycat with ('type' = 'generic_in_memory', 
'default-database' = 'db');");
+
+verifyConfigureSession(sessionHandle, "USE CATALOG mycat;");
+
assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");

Review Comment:
   Makes sense!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-15 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1023338175


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -311,8 +340,29 @@ public GatewayInfo getGatewayInfo() {
 return GatewayInfo.INSTANCE;
 }
 
+// 

+
 @VisibleForTesting
 public Session getSession(SessionHandle sessionHandle) {
 return sessionManager.getSession(sessionHandle);
 }
+
+/** Fetch all results for configuring session. */
+private ResultSet fetchConfigureSessionResult(
+SessionHandle sessionHandle, OperationHandle operationHandle) {
+ResultSet firstResult = fetchResults(sessionHandle, operationHandle, 
0, Integer.MAX_VALUE);
+while (firstResult == ResultSet.NOT_READY_RESULTS) {
+firstResult = fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE);

Review Comment:
   Let me ask this question:  If one of the operations on the gateway took some 
time, say, 5 seconds.  How many calls to the gateway would we expect to make 
here while waiting for the results to be ready?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jnh5y commented on a diff in pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-11-16 Thread GitBox


jnh5y commented on code in PR #21133:
URL: https://github.com/apache/flink/pull/21133#discussion_r1024424268


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java:
##
@@ -311,8 +340,29 @@ public GatewayInfo getGatewayInfo() {
 return GatewayInfo.INSTANCE;
 }
 
+// 

+
 @VisibleForTesting
 public Session getSession(SessionHandle sessionHandle) {
 return sessionManager.getSession(sessionHandle);
 }
+
+/** Fetch all results for configuring session. */
+private ResultSet fetchConfigureSessionResult(
+SessionHandle sessionHandle, OperationHandle operationHandle) {
+ResultSet firstResult = fetchResults(sessionHandle, operationHandle, 
0, Integer.MAX_VALUE);
+while (firstResult == ResultSet.NOT_READY_RESULTS) {
+firstResult = fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE);

Review Comment:
   Sounds good and sounds like you and @fsk119 will figure out a good solution!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org