[flink] branch master updated: [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)

2019-05-13 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2826ff8  [FLINK-12374][table-planner-blink] Support translation from 
StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. 
(#8407)
2826ff8 is described below

commit 2826ff80c4b056d7af589649238b3acabca43837
Author: Jing Zhang 
AuthorDate: Tue May 14 09:21:39 2019 +0800

[FLINK-12374][table-planner-blink] Support translation from 
StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. 
(#8407)
---
 .../PlannerResolvedFieldReference.scala|  26 ++
 .../logical/FlinkLogicalTableSourceScan.scala  |  15 +-
 .../physical/batch/BatchExecTableSourceScan.scala  |  62 +++-
 .../stream/StreamExecTableSourceScan.scala | 148 +-
 .../apache/flink/table/plan/util/ScanUtil.scala|   2 +-
 .../flink/table/sources/TableSourceUtil.scala  | 321 -
 .../table/sources/definedTimeAttributes.scala  |  95 ++
 .../table/sources/tsextractors/ExistingField.scala | 108 +++
 .../sources/wmstrategies/AscendingTimestamps.scala |  49 
 .../sources/wmstrategies/watermarkStrategies.scala |  81 ++
 .../table/runtime/batch/sql/TableScanITCase.scala  | 119 
 .../table/runtime/stream/sql/TableScanITCase.scala | 144 +
 .../apache/flink/table/util/testTableSources.scala | 106 +++
 13 files changed, 1254 insertions(+), 22 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
new file mode 100644
index 000..a3406f8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class PlannerResolvedFieldReference(
+name: String,
+resultType: TypeInformation[_],
+fieldIndex: Int) extends ResolvedFieldReference
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index cb94b3a..7b2b7a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
-import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import 
org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan
@@ -49,7 +48,9 @@ class FlinkLogicalTableSourceScan(
   extends TableScan(cluster, traitSet, relOptTable)
   with FlinkLogicalRel {
 
-  val tableSource: TableSource[_] = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+  lazy val tableSource: TableSource[_] = tableSourceTable.tableSource
+
+  private lazy val tableSourceTable = 
relOptTable.unwrap(classOf[TableSourceTable[_]])
 
   def copy(
   traitSet: RelTraitSet,
@@ -63,15 +64,7 @@ class FlinkLogicalTableSourceScan(
 
   override def deriveRowType(): RelDataType = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-tableSource match {
-  case s: StreamTableSource[_] =>
-TableSourceUtil.getRelDataType(s, None, streaming = true, 
flinkTypeFactory)
-  case _: BatchTableSource[_] =>
-flinkTypeFactory.buildLogicalRowType(
-  tableSource.getTableSchema, isStreaming = Op

[flink] branch master updated: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-13 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f919b23  [FLINK-12233][hive] Support table related operations in 
HiveCatalog
f919b23 is described below

commit f919b23890913aa8d3f6241a1d70ef82006863fc
Author: bowen.li 
AuthorDate: Tue May 7 11:52:17 2019 -0700

[FLINK-12233][hive] Support table related operations in HiveCatalog

This PR introduced HiveCatalogTable and implemented table related catalog 
APIs in HiveCatalog.

This closes #8353.
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 238 ++---
 .../flink/table/catalog/hive/HiveCatalog.java  | 110 +++---
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 190 
 .../flink/table/catalog/hive/HiveCatalogTable.java | 105 +
 .../flink/table/catalog/hive/HiveCatalogUtil.java  |   1 -
 .../flink/table/catalog/hive/HiveTableConfig.java  |   4 +-
 .../hive/util/GenericHiveMetastoreCatalogUtil.java | 171 ---
 .../HiveTableUtil.java}|  31 ++-
 .../hive/GenericHiveMetastoreCatalogTest.java  |   3 +-
 .../flink/table/catalog/hive/HiveCatalogTest.java  | 110 --
 .../flink/table/catalog/hive/HiveTestUtils.java|   7 +
 .../table/catalog/GenericInMemoryCatalogTest.java  |   4 +-
 .../flink/table/catalog/CatalogTestBase.java   |  33 ++-
 .../flink/table/catalog/CatalogTestUtil.java   |  13 +-
 14 files changed, 594 insertions(+), 426 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 843b4a2..c6d14f4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -33,25 +38,28 @@ import 
org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import 
org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
@@ -59,6 +67,10 @@ import java.util.List;
 public class GenericHiveMetastoreCatalog extends HiveCatalog

[flink] branch master updated: [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common

2019-05-13 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e8df4fb  [hotfix][table-planner] Port TimeIndicatorTypeInfo to 
table-common
e8df4fb is described below

commit e8df4fb1e50ffc353e807eaf1aa3ee106b16559a
Author: Dawid Wysakowicz 
AuthorDate: Tue May 7 18:09:37 2019 +0200

[hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common

This closes #8363.
---
 .../table/typeutils/TimeIndicatorTypeInfo.java | 77 ++
 .../table/typeutils/TimeIndicatorTypeInfo.scala| 59 -
 2 files changed, 77 insertions(+), 59 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
new file mode 100644
index 000..e46bf3f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
+
+import java.sql.Timestamp;
+
+/**
+ * Type information for indicating event or processing time. However, it 
behaves like a
+ * regular SQL timestamp but is serialized as Long.
+ */
+@Internal
+public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo {
+
+   private final boolean isEventTime;
+
+   public static final int ROWTIME_STREAM_MARKER = -1;
+   public static final int PROCTIME_STREAM_MARKER = -2;
+
+   public static final int ROWTIME_BATCH_MARKER = -3;
+   public static final int PROCTIME_BATCH_MARKER = -4;
+
+   public static final TimeIndicatorTypeInfo ROWTIME_INDICATOR = new 
TimeIndicatorTypeInfo(true);
+   public static final TimeIndicatorTypeInfo PROCTIME_INDICATOR = new 
TimeIndicatorTypeInfo(false);
+
+   @SuppressWarnings("unchecked")
+   protected TimeIndicatorTypeInfo(boolean isEventTime) {
+   super(Timestamp.class, SqlTimestampSerializer.INSTANCE, (Class) 
SqlTimestampComparator.class);
+   this.isEventTime = isEventTime;
+   }
+
+   // this replaces the effective serializer by a LongSerializer
+   // it is a hacky but efficient solution to keep the object creation 
overhead low but still
+   // be compatible with the corresponding SqlTimestampTypeInfo
+   @Override
+   @SuppressWarnings("unchecked")
+   public TypeSerializer createSerializer(ExecutionConfig 
executionConfig) {
+   return (TypeSerializer) LongSerializer.INSTANCE;
+   }
+
+   public boolean isEventTime() {
+   return isEventTime;
+   }
+
+   @Override
+   public String toString() {
+   if (isEventTime) {
+   return "TimeIndicatorTypeInfo(rowtime)";
+   } else {
+   return "TimeIndicatorTypeInfo(proctime)";
+   }
+   }
+}
+
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
deleted file mode 100644
index ad82d52..000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding cop

[flink] branch master updated: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

2019-05-13 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 0e953d1  [FLINK-12159]. Enable YarnMiniCluster integration test under 
non-secure mode
0e953d1 is described below

commit 0e953d10c2cdc61dc978a13c0c94320034bf0179
Author: Jeff Zhang 
AuthorDate: Wed Apr 10 22:53:25 2019 +0800

[FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

Remove setting of yarn.minicluster.fixed.ports

copy yarn-site.xml to target/test-classes

This commit closes #8144.
---
 .../java/org/apache/flink/yarn/YarnTestBase.java   | 21 +++-
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 38 +-
 .../src/main/java/org/apache/flink/yarn/Utils.java | 12 ---
 3 files changed, 35 insertions(+), 36 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index bd46dc4..9aad148 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -154,6 +154,8 @@ public abstract class YarnTestBase extends TestLogger {
protected static File tempConfPathForSecureRun = null;
protected static File flinkShadedHadoopDir;
 
+   protected static File yarnSiteXML = null;
+
private YarnClient yarnClient = null;
 
private static org.apache.flink.configuration.Configuration 
globalConfiguration;
@@ -166,7 +168,6 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION = new YarnConfiguration();

YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
32);

YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
-   
YARN_CONFIGURATION.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);

YARN_CONFIGURATION.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);

YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
@@ -340,15 +341,14 @@ public abstract class YarnTestBase extends TestLogger {
}
}
 
-   public static File writeYarnSiteConfigXML(Configuration yarnConf) 
throws IOException {
-   tmp.create();
-   File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + 
"/yarn-site.xml");
-
+   // write yarn-site.xml to target/test-classes so that flink pick can 
pick up this when
+   // initializing YarnClient properly from classpath
+   public static void writeYarnSiteConfigXML(Configuration yarnConf, File 
targetFolder) throws IOException {
+   yarnSiteXML = new File(targetFolder, "/yarn-site.xml");
try (FileWriter writer = new FileWriter(yarnSiteXML)) {
yarnConf.writeXml(writer);
writer.flush();
}
-   return yarnSiteXML;
}
 
/**
@@ -584,9 +584,10 @@ public abstract class YarnTestBase extends TestLogger {
 
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
 
-   File yarnConfFile = writeYarnSiteConfigXML(conf);
-   map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
+   File targetTestClassesFolder = new 
File("target/test-classes");
+   writeYarnSiteConfigXML(conf, targetTestClassesFolder);
map.put("IN_TESTS", "yes we are in tests"); // see 
YarnClusterDescriptor() for more infos
+   map.put("YARN_CONF_DIR", 
targetTestClassesFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
 
Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);
@@ -890,6 +891,10 @@ public abstract class YarnTestBase extends TestLogger {
tempConfPathForSecureRun = null;
}
 
+   if (yarnSiteXML != null) {
+   yarnSiteXML.delete();
+   }
+
// When we are on travis, we copy the temp files of JUnit 
(containing the MiniYARNCluster log files)
// to /target/flink-yarn-tests-*.
// The files from there are picked up by the 
./tools/travis_watchdog.sh script
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 23fddd5..20b5417 

[flink] branch release-1.8 updated: [FLINK-12301] Fix ScalaCaseClassSerializer to support value types

2019-05-13 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new ecc6639  [FLINK-12301] Fix ScalaCaseClassSerializer to support value 
types
ecc6639 is described below

commit ecc6639053cb36672ce552bb7626f75ff98b8293
Author: Igal Shilman 
AuthorDate: Mon May 13 11:00:37 2019 +0200

[FLINK-12301] Fix ScalaCaseClassSerializer to support value types

We now use Scala reflection because it correctly deals with Scala
language features.
---
 .../scala/typeutils/ScalaCaseClassSerializer.scala | 65 +++---
 .../ScalaCaseClassSerializerReflectionTest.scala   | 41 ++
 2 files changed, 50 insertions(+), 56 deletions(-)

diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
index 7ff1427..fbaa2ac 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
@@ -18,15 +18,14 @@
 
 package org.apache.flink.api.scala.typeutils
 
+import java.io.ObjectInputStream
+
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot
 import 
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer
 import org.apache.flink.api.common.typeutils._
 import 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
 import 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.lookupConstructor
 
-import java.io.ObjectInputStream
-import java.lang.invoke.{MethodHandle, MethodHandles}
-
 import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe
 
@@ -38,16 +37,16 @@ import scala.reflect.runtime.universe
   */
 @SerialVersionUID(1L)
 class ScalaCaseClassSerializer[T <: Product](
-  clazz: Class[T],
-  scalaFieldSerializers: Array[TypeSerializer[_]]
-) extends CaseClassSerializer[T](clazz, scalaFieldSerializers)
-with SelfResolvingTypeSerializer[T] {
+clazz: Class[T],
+scalaFieldSerializers: Array[TypeSerializer[_]]
+) extends CaseClassSerializer[T](clazz, scalaFieldSerializers)
+  with SelfResolvingTypeSerializer[T] {
 
   @transient
   private var constructor = lookupConstructor(clazz)
 
   override def createInstance(fields: Array[AnyRef]): T = {
-constructor.invoke(fields).asInstanceOf[T]
+constructor(fields)
   }
 
   override def snapshotConfiguration(): TypeSerializerSnapshot[T] = {
@@ -55,8 +54,7 @@ class ScalaCaseClassSerializer[T <: Product](
   }
 
   override def resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(
-s: TypeSerializerConfigSnapshot[T]
-  ): TypeSerializerSchemaCompatibility[T] = {
+  s: TypeSerializerConfigSnapshot[T]): 
TypeSerializerSchemaCompatibility[T] = {
 
 require(s.isInstanceOf[TupleSerializerConfigSnapshot[_]])
 
@@ -85,22 +83,8 @@ class ScalaCaseClassSerializer[T <: Product](
 
 object ScalaCaseClassSerializer {
 
-  def lookupConstructor[T](clazz: Class[_]): MethodHandle = {
-val types = findPrimaryConstructorParameterTypes(clazz, 
clazz.getClassLoader)
-
-val constructor = clazz.getConstructor(types: _*)
-
-val handle = MethodHandles
-  .lookup()
-  .unreflectConstructor(constructor)
-  .asSpreader(classOf[Array[AnyRef]], types.length)
-
-handle
-  }
-
-  private def findPrimaryConstructorParameterTypes(cls: Class[_], cl: 
ClassLoader):
-  List[Class[_]] = {
-val rootMirror = universe.runtimeMirror(cl)
+  def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T = {
+val rootMirror = universe.runtimeMirror(cls.getClassLoader)
 val classSymbol = rootMirror.classSymbol(cls)
 
 require(
@@ -113,30 +97,21 @@ object ScalaCaseClassSerializer {
  |""".stripMargin
 )
 
-val primaryConstructorSymbol = 
findPrimaryConstructorMethodSymbol(classSymbol)
-val scalaTypes = getArgumentsTypes(primaryConstructorSymbol)
-scalaTypes.map(tpe => scalaTypeToJavaClass(rootMirror)(tpe))
-  }
-
-  private def findPrimaryConstructorMethodSymbol(classSymbol: 
universe.ClassSymbol):
-  universe.MethodSymbol = {
-classSymbol.toType
+val primaryConstructorSymbol = classSymbol.toType
   .decl(universe.termNames.CONSTRUCTOR)
   .alternatives
+  .collectFirst({
+case constructorSymbol: universe.MethodSymbol if 
constructorSymbol.isPrimaryConstructor =>
+  constructorSymbol
+  })
   .head
   .asMethod
-  }
 
-  private def getArgumentsTypes(primaryConstructorSymbol: 
universe.MethodSymbol):
-  List[universe.Type] = {
-primaryConstructorSymbol.typeSignature
-  .paramLists
-  .head
-  .map(sy

[flink] branch master updated: [FLINK-12301] Fix ScalaCaseClassSerializer to support value types

2019-05-13 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9caf2c4  [FLINK-12301] Fix ScalaCaseClassSerializer to support value 
types
9caf2c4 is described below

commit 9caf2c4355f851c7a8ca2b1fe9a1c6dab7bd95e3
Author: Igal Shilman 
AuthorDate: Mon May 13 11:00:37 2019 +0200

[FLINK-12301] Fix ScalaCaseClassSerializer to support value types

We now use Scala reflection because it correctly deals with Scala
language features.
---
 .../scala/typeutils/ScalaCaseClassSerializer.scala | 65 +++---
 .../ScalaCaseClassSerializerReflectionTest.scala   | 41 ++
 2 files changed, 50 insertions(+), 56 deletions(-)

diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
index 7ff1427..fbaa2ac 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala
@@ -18,15 +18,14 @@
 
 package org.apache.flink.api.scala.typeutils
 
+import java.io.ObjectInputStream
+
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot
 import 
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer
 import org.apache.flink.api.common.typeutils._
 import 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
 import 
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.lookupConstructor
 
-import java.io.ObjectInputStream
-import java.lang.invoke.{MethodHandle, MethodHandles}
-
 import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe
 
@@ -38,16 +37,16 @@ import scala.reflect.runtime.universe
   */
 @SerialVersionUID(1L)
 class ScalaCaseClassSerializer[T <: Product](
-  clazz: Class[T],
-  scalaFieldSerializers: Array[TypeSerializer[_]]
-) extends CaseClassSerializer[T](clazz, scalaFieldSerializers)
-with SelfResolvingTypeSerializer[T] {
+clazz: Class[T],
+scalaFieldSerializers: Array[TypeSerializer[_]]
+) extends CaseClassSerializer[T](clazz, scalaFieldSerializers)
+  with SelfResolvingTypeSerializer[T] {
 
   @transient
   private var constructor = lookupConstructor(clazz)
 
   override def createInstance(fields: Array[AnyRef]): T = {
-constructor.invoke(fields).asInstanceOf[T]
+constructor(fields)
   }
 
   override def snapshotConfiguration(): TypeSerializerSnapshot[T] = {
@@ -55,8 +54,7 @@ class ScalaCaseClassSerializer[T <: Product](
   }
 
   override def resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(
-s: TypeSerializerConfigSnapshot[T]
-  ): TypeSerializerSchemaCompatibility[T] = {
+  s: TypeSerializerConfigSnapshot[T]): 
TypeSerializerSchemaCompatibility[T] = {
 
 require(s.isInstanceOf[TupleSerializerConfigSnapshot[_]])
 
@@ -85,22 +83,8 @@ class ScalaCaseClassSerializer[T <: Product](
 
 object ScalaCaseClassSerializer {
 
-  def lookupConstructor[T](clazz: Class[_]): MethodHandle = {
-val types = findPrimaryConstructorParameterTypes(clazz, 
clazz.getClassLoader)
-
-val constructor = clazz.getConstructor(types: _*)
-
-val handle = MethodHandles
-  .lookup()
-  .unreflectConstructor(constructor)
-  .asSpreader(classOf[Array[AnyRef]], types.length)
-
-handle
-  }
-
-  private def findPrimaryConstructorParameterTypes(cls: Class[_], cl: 
ClassLoader):
-  List[Class[_]] = {
-val rootMirror = universe.runtimeMirror(cl)
+  def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T = {
+val rootMirror = universe.runtimeMirror(cls.getClassLoader)
 val classSymbol = rootMirror.classSymbol(cls)
 
 require(
@@ -113,30 +97,21 @@ object ScalaCaseClassSerializer {
  |""".stripMargin
 )
 
-val primaryConstructorSymbol = 
findPrimaryConstructorMethodSymbol(classSymbol)
-val scalaTypes = getArgumentsTypes(primaryConstructorSymbol)
-scalaTypes.map(tpe => scalaTypeToJavaClass(rootMirror)(tpe))
-  }
-
-  private def findPrimaryConstructorMethodSymbol(classSymbol: 
universe.ClassSymbol):
-  universe.MethodSymbol = {
-classSymbol.toType
+val primaryConstructorSymbol = classSymbol.toType
   .decl(universe.termNames.CONSTRUCTOR)
   .alternatives
+  .collectFirst({
+case constructorSymbol: universe.MethodSymbol if 
constructorSymbol.isPrimaryConstructor =>
+  constructorSymbol
+  })
   .head
   .asMethod
-  }
 
-  private def getArgumentsTypes(primaryConstructorSymbol: 
universe.MethodSymbol):
-  List[universe.Type] = {
-primaryConstructorSymbol.typeSignature
-  .paramLists
-  .head
-  .map(symbol => sy

[flink] branch master updated: [FLINK-12164][runtime] Harden JobMasterTest against timeouts

2019-05-13 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9c2bcae  [FLINK-12164][runtime] Harden JobMasterTest against timeouts
9c2bcae is described below

commit 9c2bcae735cd00336af3284e4f631afe061552ba
Author: Chesnay Schepler 
AuthorDate: Thu May 9 17:01:37 2019 +0200

[FLINK-12164][runtime] Harden JobMasterTest against timeouts

This closes #8388.
---
 .../src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 52792f8..ff9d6ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -202,7 +202,7 @@ public class JobMasterTest extends TestLogger {
private static final Time testingTimeout = Time.seconds(10L);
 
private static final long fastHeartbeatInterval = 1L;
-   private static final long fastHeartbeatTimeout = 5L;
+   private static final long fastHeartbeatTimeout = 10L;
 
private static final long heartbeatInterval = 1000L;
private static final long heartbeatTimeout = 5_000_000L;



[flink] branch master updated: [FLINK-12493] Add step to export Hadoop classpath to docs

2019-05-13 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 518616e  [FLINK-12493] Add step to export Hadoop classpath to docs
518616e is described below

commit 518616e9f3527b2564a640b38c80db3eb54bef37
Author: Alberto Romero 
AuthorDate: Mon May 13 11:04:17 2019 +0100

[FLINK-12493] Add step to export Hadoop classpath to docs

Github PR #8422
---
 docs/ops/deployment/aws.md | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/docs/ops/deployment/aws.md b/docs/ops/deployment/aws.md
index b465340..16e9bd7 100644
--- a/docs/ops/deployment/aws.md
+++ b/docs/ops/deployment/aws.md
@@ -56,7 +56,13 @@ when creating an EMR cluster.
 After creating your cluster, you can [connect to the master 
node](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-connect-master-node.html)
 and install Flink:
 
 1. Go the [Downloads Page]({{ site.download_url }}) and **download a binary 
version of Flink matching the Hadoop version** of your EMR cluster, e.g. Hadoop 
2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
-2. Extract the Flink distribution and you are ready to deploy [Flink jobs via 
YARN](yarn_setup.html) after **setting the Hadoop config directory**:
+2. Make sure all the Hadoop dependencies are in the classpath before you 
submit any jobs to EMR:
+
+{% highlight bash %}
+export HADOOP_CLASSPATH=`hadoop classpath`
+{% endhighlight %}
+
+3. Extract the Flink distribution and you are ready to deploy [Flink jobs via 
YARN](yarn_setup.html) after **setting the Hadoop config directory**:
 
 {% highlight bash %}
 HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 
examples/streaming/WordCount.jar



[flink] branch master updated: [FLINK-12495][python][client] Move PythonGatewayServer into flink-clients.

2019-05-13 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d1542e9  [FLINK-12495][python][client] Move PythonGatewayServer into 
flink-clients.
d1542e9 is described below

commit d1542e9561c6235feb902c9c6d781ba416b8f784
Author: sunjincheng121 
AuthorDate: Mon May 13 14:50:37 2019 +0800

[FLINK-12495][python][client] Move PythonGatewayServer into flink-clients.

This closes #8423
---
 flink-clients/pom.xml  | 7 +++
 .../java/org/apache/flink/client}/python/PythonGatewayServer.java  | 2 +-
 flink-core/pom.xml | 7 ---
 flink-python/pyflink/java_gateway.py   | 2 +-
 pom.xml| 1 +
 5 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 6476798..514b6cd 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -68,6 +68,13 @@ under the License.
commons-cli

 
+   
+   
+   net.sf.py4j
+   py4j
+   ${py4j.version}
+   
+

 

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/python/PythonGatewayServer.java 
b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
similarity index 98%
rename from 
flink-core/src/main/java/org/apache/flink/api/python/PythonGatewayServer.java
rename to 
flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
index 3e17401..6432a67 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/python/PythonGatewayServer.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.python;
+package org.apache.flink.client.python;
 
 import py4j.GatewayServer;
 
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index c6ff872..0768e81 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -86,13 +86,6 @@ under the License.
flink-shaded-guava

 
-   
-   
-   net.sf.py4j
-   py4j
-   0.10.8.1
-   
-

 

diff --git a/flink-python/pyflink/java_gateway.py 
b/flink-python/pyflink/java_gateway.py
index 5c931de..ed1dc89 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -62,7 +62,7 @@ def launch_gateway():
 raise Exception("Windows system is not supported currently.")
 script = "./bin/pyflink-gateway-server.sh"
 command = [os.path.join(FLINK_HOME, script)]
-command += ['-c', 'org.apache.flink.api.python.PythonGatewayServer']
+command += ['-c', 'org.apache.flink.client.python.PythonGatewayServer']
 
 # Create a temporary directory where the gateway server should write the 
connection information.
 conn_info_dir = tempfile.mkdtemp()
diff --git a/pom.xml b/pom.xml
index 539e9d0..ed9ae64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@ under the License.
2.21.0
2.0.0-RC.4
1.3
+   0.10.8.1
false
validate

[flink] branch master updated: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 754cd71d) into Chinese documents

2019-05-13 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3f532e1  [FLINK-12421][docs-zh] Synchronize the latest documentation 
changes (commits to 754cd71d) into Chinese documents
3f532e1 is described below

commit 3f532e18b96b83abdde189c4304f66c60b285d5c
Author: Jark Wu 
AuthorDate: Tue May 7 10:11:08 2019 +0800

[FLINK-12421][docs-zh] Synchronize the latest documentation changes 
(commits to 754cd71d) into Chinese documents

This closes #8354
---
 docs/_includes/sidenav.html |   5 +-
 docs/dev/api_concepts.zh.md |   2 +-
 docs/dev/connectors/kafka.md|   2 +-
 docs/dev/connectors/kafka.zh.md |   2 +-
 docs/dev/execution.zh.md|   2 +-
 docs/dev/execution_configuration.zh.md  |   2 +-
 docs/dev/execution_plans.zh.md  |   2 +-
 docs/dev/packaging.zh.md|   4 +-
 docs/dev/parallel.zh.md |   2 +-
 docs/dev/restart_strategies.zh.md   |   2 +-
 docs/dev/stream/state/queryable_state.zh.md |  44 +--
 docs/dev/table/functions.zh.md  | 402 +++-
 docs/dev/table/tableApi.zh.md   | 294 +++-
 docs/ops/deployment/yarn_setup.zh.md|   1 +
 docs/ops/index.zh.md|   2 +-
 15 files changed, 673 insertions(+), 95 deletions(-)

diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 247a0e3..7f200b1 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -116,7 +116,10 @@ level is determined by 'nav-pos'.
 {% capture collapse_target %}"#collapse-{{ i }}" 
data-toggle="collapse"{% if active %} class="active"{% endif %}{% endcapture %}
 {% capture expand %}{% unless active %} {% endunless %}{% 
endcapture %}
 {{ title }}{{ expand }}
-  {% if this.nav-show_overview %}Overview{% endif %}
+{% if this.nav-show_overview %}
+  
+{% if page.is_default_language %}Overview{% else %}概览{% endif 
%}
+{% endif %}
 {% assign elements = elements | push: children %}
 {% assign elementsPosStack = elementsPosStack | push: elementsPos %}
 {% assign posStack = posStack | push: pos %}
diff --git a/docs/dev/api_concepts.zh.md b/docs/dev/api_concepts.zh.md
index bd7ca5a..619ee70 100644
--- a/docs/dev/api_concepts.zh.md
+++ b/docs/dev/api_concepts.zh.md
@@ -783,7 +783,7 @@ defined in the `write()`and `readFields()` methods will be 
used for serializatio
 
 You can use special types, including Scala's `Either`, `Option`, and `Try`.
 The Java API has its own custom implementation of `Either`.
-Similarly to Scala's `Either`, it represents a value of one two possible 
types, *Left* or *Right*.
+Similarly to Scala's `Either`, it represents a value of two possible types, 
*Left* or *Right*.
 `Either` can be useful for error handling or operators that need to output two 
different types of records.
 
  Type Erasure & Type Inference
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index e4e4d5d..8f893c4 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -88,7 +88,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 >= 1.0.0
 
 This universal Kafka connector attempts to track the latest version of 
the Kafka client.
-The version of the client it uses may change between Flink releases. 
As of this release, it uses the Kafka 2.2.0 client.
+The version of the client it uses may change between Flink releases. 
Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client.
 Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
 However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated
 flink-connector-kafka-0.11{{ site.scala_version_suffix }} and 
flink-connector-kafka-0.10{{ site.scala_version_suffix }} respectively.
diff --git a/docs/dev/connectors/kafka.zh.md b/docs/dev/connectors/kafka.zh.md
index 2412277..28f3a25 100644
--- a/docs/dev/connectors/kafka.zh.md
+++ b/docs/dev/connectors/kafka.zh.md
@@ -88,7 +88,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 >= 1.0.0
 
 This universal Kafka connector attempts to track the latest version of 
the Kafka client.
-The version of the client it uses may change between Flink releases.
+The version of the client it uses may change between Flink releases. 
Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client.
 Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
 However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated
 

[flink] branch release-1.6 updated: [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators

2019-05-13 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new 0dda6fe  [FLINK-12296][StateBackend] Fix local state directory 
collision with state loss for chained keyed operators
0dda6fe is described below

commit 0dda6fe9dff4f667b110cda39bfe9738ba615b24
Author: Congxian Qiu 
AuthorDate: Mon May 13 16:29:34 2019 +0800

[FLINK-12296][StateBackend] Fix local state directory collision with state 
loss for chained keyed operators

This closes #8338.
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  16 +-
 .../tasks/OneInputStreamTaskTestHarness.java   |  43 +++-
 .../runtime/tasks/StreamConfigChainer.java |  23 +-
 .../runtime/tasks/StreamMockEnvironment.java   |   8 +-
 .../runtime/tasks/StreamTaskTestHarness.java   |  21 +-
 .../state/StatefulOperatorChainedTaskTest.java | 259 +
 6 files changed, 357 insertions(+), 13 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 17ba985..62c540f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -270,6 +270,9 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
/** Shared wrapper for batch writes to the RocksDB instance. */
private RocksDBWriteBatchWrapper writeBatchWrapper;
 
+   /** The local directory name of the current snapshot strategy. */
+   private final String localDirectoryName;
+
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
@@ -319,6 +322,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+   this.localDirectoryName = 
this.backendUID.toString().replaceAll("[\\-]", "");
 
this.snapshotStrategy = enableIncrementalCheckpointing ?
new IncrementalSnapshotStrategy() :
@@ -1977,17 +1981,17 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
LocalRecoveryDirectoryProvider 
directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
File directory = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
 
-   if (directory.exists()) {
-   FileUtils.deleteDirectory(directory);
-   }
-
-   if (!directory.mkdirs()) {
+   if (!directory.exists() && !directory.mkdirs()) 
{
throw new IOException("Local state base 
directory for checkpoint " + checkpointId +
" already exists: " + 
directory);
}
 
// introduces an extra directory because 
RocksDB wants a non-existing directory for native checkpoints.
-   File rdbSnapshotDir = new File(directory, 
"rocks_db");
+   // append localDirectoryName here to solve 
directory collision problem when two stateful operators chained in one task.
+   File rdbSnapshotDir = new File(directory, 
localDirectoryName);
+   if (rdbSnapshotDir.exists()) {
+   
FileUtils.deleteDirectory(rdbSnapshotDir);
+   }
Path path = new Path(rdbSnapshotDir.toURI());
// create a "permanent" snapshot directory 
because local recovery is active.
snapshotDirectory = 
SnapshotDirectory.permanent(path);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 89a4f81..0a7efda 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/run