This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c09cd65  [KYUUBI #2203][FLINK] Support flink conf set by kyuubi conf 
file
c09cd65 is described below

commit c09cd654bbc918178112a2b9d20c1a0e35154f95
Author: Ada <[email protected]>
AuthorDate: Thu Mar 24 23:35:02 2022 +0800

    [KYUUBI #2203][FLINK] Support flink conf set by kyuubi conf file
    
    ### _Why are the changes needed?_
    
    Support setting Flink configuration by setting `kyuubi-default.conf`.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2206 from deadwind4/KYUUBI-2203.
    
    Closes #2203
    
    eeeb91cd [Ada] fix IT case
    5eefba57 [Ada] add IT case
    86fc57e1 [Ada] [KYUUBI #2203][engine/flink] Support flink conf set by 
kyuubi conf file
    
    Authored-by: Ada <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  5 ++-
 .../kyuubi/it/flink/FlinkSQLEngineSuite.scala      | 48 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 127faad..cce82bd 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -26,8 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, 
DefaultCLI, GenericCLI}
-import org.apache.flink.configuration.DeploymentOptions
-import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
GlobalConfiguration}
 import org.apache.flink.table.client.SqlClientException
 import org.apache.flink.table.client.gateway.context.DefaultContext
 import org.apache.flink.util.JarUtils
@@ -77,6 +76,8 @@ object FlinkSQLEngine extends Logging {
     try {
       val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
       val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
+      val flinkConfFromKyuubi = kyuubiConf.getAllWithPrefix("flink", "")
+      flinkConf.addAll(Configuration.fromMap(flinkConfFromKyuubi.asJava))
 
       val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
       // set cluster name for per-job and application mode
diff --git 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala
 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala
new file mode 100644
index 0000000..814a6d1
--- /dev/null
+++ 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.flink
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, 
FRONTEND_THRIFT_BINARY_BIND_PORT}
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class FlinkSQLEngineSuite extends WithKyuubiServerAndFlinkMiniCluster with 
HiveJDBCTestHelper {
+
+  override val conf: KyuubiConf = KyuubiConf()
+    .set(ENGINE_TYPE, "FLINK_SQL")
+    .set(FRONTEND_THRIFT_BINARY_BIND_PORT, 10029)
+    .set("flink.parallelism.default", "6")
+
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  test("set kyuubi conf into flink conf") {
+    withJdbcStatement() { statement =>
+      val resultSet = statement.executeQuery("SET")
+      // Flink does not support set key without value currently,
+      // thus read all rows to find the desired one
+      var success = false
+      while (resultSet.next() && success == false) {
+        if (resultSet.getString(1) == "parallelism.default" &&
+          resultSet.getString(2) == "6") {
+          success = true
+        }
+      }
+      assert(success)
+    }
+  }
+}

Reply via email to