This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2353abe  KYLIN-4736 Upgrade flink version to 1.11.1 (#1462)
2353abe is described below

commit 2353abe84b2a60fe2dec0485d638a98e20244004
Author: chenjie-sau <chenjie.sau...@gmail.com>
AuthorDate: Tue Oct 27 10:35:07 2020 +0800

    KYLIN-4736 Upgrade flink version to 1.11.1 (#1462)
---
 build/bin/download-flink.sh                              | 13 ++++++-------
 .../java/org/apache/kylin/common/util/HadoopUtil.java    |  2 ++
 core-common/src/main/resources/kylin-defaults.properties |  5 ++---
 .../kylin/engine/flink/FlinkOnYarnConfigMapping.java     | 16 ++++------------
 .../kylin/engine/flink/FlinkOnYarnConfigMappingTest.java | 14 +++++++-------
 pom.xml                                                  |  2 +-
 6 files changed, 22 insertions(+), 30 deletions(-)

diff --git a/build/bin/download-flink.sh b/build/bin/download-flink.sh
index 4d2fdb8..c118a8f 100755
--- a/build/bin/download-flink.sh
+++ b/build/bin/download-flink.sh
@@ -35,12 +35,11 @@ if [[ `uname -a` =~ "Darwin" ]]; then
     alias md5cmd="md5 -q"
 fi
 
-flink_version="1.9.2"
+flink_version="1.11.1"
 scala_version="2.11"
-flink_shaded_version="10.0"
-hadoop_version="2.7.5"
-flink_pkg_md5="0718a04fe0a641cc5f5368124a4c54a5"
-flink_shaded_hadoop_md5="4287a314bfb09a3dc957cbda3f91d7ca"
+flink_shaded_hadoop_version="3.1.1.7.1.1.0-565-9.0"
+flink_pkg_md5="3b7aa59b44add1a0625737f6516e0929"
+flink_shaded_hadoop_md5="7b78e546dd93f4facd322921f29de1eb"
 
 if [ ! -f "flink-${flink_version}-bin-scala_${scala_version}.tgz" ]; then
     echo "No binary file found, start to download package to 
${flink_package_dir}"
@@ -53,8 +52,8 @@ else
     fi
 fi
 
-flink_shaded_hadoop_jar="flink-shaded-hadoop-2-uber-${hadoop_version}-${flink_shaded_version}.jar"
-flink_shaded_hadoop_path="https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${hadoop_version}-${flink_shaded_version}/${flink_shaded_hadoop_jar}";
+flink_shaded_hadoop_jar="flink-shaded-hadoop-3-uber-${flink_shaded_hadoop_version}.jar"
+flink_shaded_hadoop_path="https://repository.cloudera.com/artifactory/libs-release-local/org/apache/flink/flink-shaded-hadoop-3-uber/${flink_shaded_hadoop_version}/${flink_shaded_hadoop_jar}";
 
 if [ ! -f $flink_shaded_hadoop_jar ]; then
   echo "Start to download $flink_shaded_hadoop_jar"
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 0f6da04..26d0ea3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -64,12 +64,14 @@ public class HadoopUtil {
             return conf;
         }
         Configuration conf = hadoopConfig.get();
+        conf.set("fs.hdfs.impl.disable.cache", "true");
         return conf;
     }
 
     public static Configuration healSickConfig(Configuration conf) {
         //  https://issues.apache.org/jira/browse/KYLIN-3064
         conf.set("yarn.timeline-service.enabled", "false");
+        conf.set("fs.hdfs.impl.disable.cache", "true");
         return conf;
     }
 
diff --git a/core-common/src/main/resources/kylin-defaults.properties 
b/core-common/src/main/resources/kylin-defaults.properties
index c16419a..ebf1cd2 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -352,10 +352,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
 ### FLINK ENGINE CONFIGS ###
 
 ## Flink conf (default is in flink/conf/flink-conf.yaml)
-kylin.engine.flink-conf.jobmanager.heap.size=2G
-kylin.engine.flink-conf.taskmanager.heap.size=4G
+kylin.engine.flink-conf.jobmanager.memory.process.size=2G
+kylin.engine.flink-conf.taskmanager.memory.process.size=4G
 kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1
-kylin.engine.flink-conf.taskmanager.memory.preallocate=false
 kylin.engine.flink-conf.job.parallelism=1
 kylin.engine.flink-conf.program.enableObjectReuse=false
 kylin.engine.flink-conf.yarn.queue=
diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
index 154d4e2..a3d2a65 100644
--- 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -14,13 +14,14 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.engine.flink;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.FallbackKey;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -38,7 +39,7 @@ public class FlinkOnYarnConfigMapping {
         flinkOnYarnConfigMap = new HashMap<>();
 
         //mapping job manager heap size -> -yjm
-        ConfigOption<String> jmHeapSizeOption = 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY;
+        ConfigOption<MemorySize> jmHeapSizeOption = 
JobManagerOptions.TOTAL_PROCESS_MEMORY;
         flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm");
         if (jmHeapSizeOption.hasFallbackKeys()) {
             Iterator<FallbackKey> deprecatedKeyIterator = 
jmHeapSizeOption.fallbackKeys().iterator();
@@ -48,7 +49,7 @@ public class FlinkOnYarnConfigMapping {
         }
 
         //mapping task manager heap size -> -ytm
-        ConfigOption<String> tmHeapSizeOption = 
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
+        ConfigOption<MemorySize> tmHeapSizeOption = 
TaskManagerOptions.TOTAL_PROCESS_MEMORY;
         flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm");
         if (tmHeapSizeOption.hasFallbackKeys()) {
             Iterator<FallbackKey> deprecatedKeyIterator = 
tmHeapSizeOption.fallbackKeys().iterator();
@@ -66,15 +67,6 @@ public class FlinkOnYarnConfigMapping {
             }
         }
 
-        ConfigOption<Boolean> tmMemoryPreallocate = 
TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE;
-        flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD 
taskmanager.memory.preallocate");
-        if (taskSlotNumOption.hasFallbackKeys()) {
-            Iterator<FallbackKey> deprecatedKeyIterator = 
tmMemoryPreallocate.fallbackKeys().iterator();
-            while (deprecatedKeyIterator.hasNext()) {
-                
flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yD 
taskmanager.memory.preallocate");
-            }
-        }
-
         //config options do not have mapping with config file key
         flinkOnYarnConfigMap.put("yarn.queue", "-yqu");
         flinkOnYarnConfigMap.put("yarn.nodelabel", "-ynl");
diff --git 
a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
 
b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
index 3cb6f28..ca301db 100644
--- 
a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
+++ 
b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.engine.flink;
 
 import org.apache.flink.configuration.FallbackKey;
@@ -40,10 +40,10 @@ public class FlinkOnYarnConfigMappingTest {
                 String flinkConfigOption = entry.getKey();
 
                 boolean matchedAnyOne;
-                matchedAnyOne = 
flinkConfigOption.equals(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key());
+                matchedAnyOne = 
flinkConfigOption.equals(JobManagerOptions.TOTAL_PROCESS_MEMORY.key());
                 if (!matchedAnyOne) {
-                    if 
(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
-                        Iterator<FallbackKey> deprecatedKeyIterator = 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY
+                    if 
(JobManagerOptions.TOTAL_PROCESS_MEMORY.hasFallbackKeys()) {
+                        Iterator<FallbackKey> deprecatedKeyIterator = 
JobManagerOptions.TOTAL_PROCESS_MEMORY
                                 .fallbackKeys().iterator();
                         while (deprecatedKeyIterator.hasNext()) {
                             matchedAnyOne = matchedAnyOne && 
flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
@@ -65,10 +65,10 @@ public class FlinkOnYarnConfigMappingTest {
                 String flinkConfigOption = entry.getKey();
 
                 boolean matchedAnyOne;
-                matchedAnyOne = 
flinkConfigOption.equals(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key());
+                matchedAnyOne = 
flinkConfigOption.equals(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key());
                 if (!matchedAnyOne) {
-                    if 
(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasFallbackKeys()) {
-                        Iterator<FallbackKey> deprecatedKeyIterator = 
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY
+                    if 
(TaskManagerOptions.TOTAL_PROCESS_MEMORY.hasFallbackKeys()) {
+                        Iterator<FallbackKey> deprecatedKeyIterator = 
TaskManagerOptions.TOTAL_PROCESS_MEMORY
                                 .fallbackKeys().iterator();
                         while (deprecatedKeyIterator.hasNext()) {
                             matchedAnyOne = matchedAnyOne && 
flinkConfigOption.equals(deprecatedKeyIterator.next().getKey());
diff --git a/pom.xml b/pom.xml
index 1c7444e..ea348dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
     <kryo.version>4.0.0</kryo.version>
 
     <!-- Flink versions -->
-    <flink.version>1.9.2</flink.version>
+    <flink.version>1.11.1</flink.version>
 
     <!-- mysql versions -->
     <mysql-connector.version>5.1.8</mysql-connector.version>

Reply via email to