This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c09cd65 [KYUUBI #2203][FLINK] Support flink conf set by kyuubi conf
file
c09cd65 is described below
commit c09cd654bbc918178112a2b9d20c1a0e35154f95
Author: Ada <[email protected]>
AuthorDate: Thu Mar 24 23:35:02 2022 +0800
[KYUUBI #2203][FLINK] Support flink conf set by kyuubi conf file
### _Why are the changes needed?_
Support setting Flink configuration by setting `kyuubi-default.conf`.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2206 from deadwind4/KYUUBI-2203.
Closes #2203
eeeb91cd [Ada] fix IT case
5eefba57 [Ada] add IT case
86fc57e1 [Ada] [KYUUBI #2203][engine/flink] Support flink conf set by
kyuubi conf file
Authored-by: Ada <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 5 ++-
.../kyuubi/it/flink/FlinkSQLEngineSuite.scala | 48 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 2 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 127faad..cce82bd 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -26,8 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine,
DefaultCLI, GenericCLI}
-import org.apache.flink.configuration.DeploymentOptions
-import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.flink.configuration.{Configuration, DeploymentOptions,
GlobalConfiguration}
import org.apache.flink.table.client.SqlClientException
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.util.JarUtils
@@ -77,6 +76,8 @@ object FlinkSQLEngine extends Logging {
try {
val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
+ val flinkConfFromKyuubi = kyuubiConf.getAllWithPrefix("flink", "")
+ flinkConf.addAll(Configuration.fromMap(flinkConfFromKyuubi.asJava))
val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
// set cluster name for per-job and application mode
diff --git
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala
new file mode 100644
index 0000000..814a6d1
--- /dev/null
+++
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.flink
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE,
FRONTEND_THRIFT_BINARY_BIND_PORT}
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class FlinkSQLEngineSuite extends WithKyuubiServerAndFlinkMiniCluster with
HiveJDBCTestHelper {
+
+ override val conf: KyuubiConf = KyuubiConf()
+ .set(ENGINE_TYPE, "FLINK_SQL")
+ .set(FRONTEND_THRIFT_BINARY_BIND_PORT, 10029)
+ .set("flink.parallelism.default", "6")
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("set kyuubi conf into flink conf") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SET")
+ // Flink does not support set key without value currently,
+ // thus read all rows to find the desired one
+ var success = false
+ while (resultSet.next() && success == false) {
+ if (resultSet.getString(1) == "parallelism.default" &&
+ resultSet.getString(2) == "6") {
+ success = true
+ }
+ }
+ assert(success)
+ }
+ }
+}