[flink] branch master updated: [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert between internal data format and java format

2019-03-05 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e2579e3  [FLINK-11827][table-runtime-blink] Introduce 
DataFormatConverters to convert between internal data format and java format
e2579e3 is described below

commit e2579e39602ab7d3e906a185353dd413aca58317
Author: JingsongLi 
AuthorDate: Tue Mar 5 16:36:10 2019 +0800

[FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to 
convert between internal data format and java format

This closes #7904
---
 .../apache/flink/table/dataformat/BinaryArray.java |  43 +
 .../table/dataformat/DataFormatConverters.java | 979 +
 .../table/runtime/functions/DateTimeUtils.java |  96 ++
 .../table/dataformat/DataFormatConvertersTest.java | 180 
 4 files changed, 1298 insertions(+)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index 57ce04b..de9d6ee 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.dataformat;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
 import org.apache.flink.table.util.SegmentsUtil;
 
 import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
@@ -40,6 +42,7 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
private static final int BOOLEAN_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(boolean[].class);
private static final int SHORT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(short[].class);
+   private static final int CHAR_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(char[].class);
private static final int INT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(int[].class);
private static final int LONG_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(long[].class);
private static final int FLOAT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(float[].class);
@@ -49,6 +52,34 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
return 4 + ((numFields + 31) / 32) * 4;
}
 
+   /**
+* It store real value when type is primitive.
+* It store the length and offset of variable-length part when type is 
string, map, etc.
+*/
+   public static int calculateFixLengthPartSize(InternalType type) {
+   if (type.equals(InternalTypes.BOOLEAN)) {
+   return 1;
+   } else if (type.equals(InternalTypes.BYTE)) {
+   return 1;
+   } else if (type.equals(InternalTypes.SHORT)) {
+   return 2;
+   } else if (type.equals(InternalTypes.INT)) {
+   return 4;
+   } else if (type.equals(InternalTypes.FLOAT)) {
+   return 4;
+   } else if (type.equals(InternalTypes.CHAR)) {
+   return 2;
+   } else if (type.equals(InternalTypes.DATE)) {
+   return 4;
+   } else if (type.equals(InternalTypes.TIME)) {
+   return 4;
+   } else {
+   // long, double is 8 bytes.
+   // It store the length and offset of variable-length 
part when type is string, map, etc.
+   return 8;
+   }
+   }
+
// The number of elements in this array
private int numElements;
 
@@ -315,6 +346,14 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
return values;
}
 
+   public char[] toCharArray() {
+   checkNoNull();
+   char[] values = new char[numElements];
+   SegmentsUtil.copyToUnsafe(
+   segments, elementOffset, values, 
CHAR_ARRAY_OFFSET, numElements * 2);
+   return values;
+   }
+
public int[] toIntArray() {
checkNoNull();
int[] values = new int[numElements];
@@ -393,6 +432,10 @@ public class BinaryArray extends BinaryFormat implements 
TypeGetterSetters {
return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 
2);
}
 
+   public static BinaryArray fromPrimitiveArray(char[] arr) {
+   return 

[flink] branch master updated: [FLINK-11826] Ignore flaky testRateLimitedConsumer

2019-03-05 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 56afa2e  [FLINK-11826] Ignore flaky testRateLimitedConsumer
56afa2e is described below

commit 56afa2e32a841922417e4ed02282d96c430875c2
Author: Thomas Weise 
AuthorDate: Tue Mar 5 12:37:09 2019 -0800

[FLINK-11826] Ignore flaky testRateLimitedConsumer
---
 .../java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index cd73872..5a4e775 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -166,6 +167,7 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 * a desired rate of 3 bytes / second. Based on the execution time, the 
test asserts that this rate was not surpassed.
 * If no rate limiter is set on the consumer, the test should fail.
 */
+   @Ignore
@Test(timeout = 6)
public void testRateLimitedConsumer() throws Exception {
final String testTopic = "testRateLimitedConsumer";



[flink] 03/05: [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80efb71c3b4f8075aeb1a9e4f2b15234af016394
Author: Till Rohrmann 
AuthorDate: Mon Mar 4 10:43:00 2019 +0100

[hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest
---
 .../leaderelection/ZooKeeperLeaderRetrievalTest.java   | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0282a4f..42c1eca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -35,12 +32,12 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -50,8 +47,13 @@ import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the ZooKeeper based leader election and retrieval.
+ */
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
private TestingServer testingServer;
@@ -79,7 +81,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@After
public void after() throws Exception {
-   if(testingServer != null) {
+   if (testingServer != null) {
testingServer.stop();
 
testingServer = null;



[flink] 02/05: [hotfix] Fix checkstyle violations in ZooKeeperUtils

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 622766f71157f7e7faa8600e00d0c63139c5343d
Author: Till Rohrmann 
AuthorDate: Fri Mar 1 15:26:09 2019 +0100

[hotfix] Fix checkstyle violations in ZooKeeperUtils
---
 .../src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java| 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 86b7e6b..ba253c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -169,8 +169,7 @@ public class ZooKeeperUtils {
 */
public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
final CuratorFramework client,
-   final Configuration configuration) throws Exception
-   {
+   final Configuration configuration) throws Exception {
return createLeaderRetrievalService(client, configuration, "");
}
 



[flink] 05/05: [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f53594d8aad42d1de44fa8a33acc043cbd4c1d57
Author: Till Rohrmann 
AuthorDate: Fri Mar 1 16:26:48 2019 +0100

[FLINK-11336][zk] Delete ZNodes when 
ZooKeeperHaServices#closeAndCleanupAllData

When calling ZooKeeperHaServices#closeAndCleanupAllData we should delete the
HA_CLUSTER_ID znode which is owned by the respective ZooKeeperHaServices.
Moreover, the method tries to go up the chain of parent znodes and tries to
delete all empty parent nodes. This should clean up otherwisely orphaned
parent znodes.
---
 .../zookeeper/ZooKeeperHaServices.java |  62 +
 .../zookeeper/ZooKeeperHaServicesTest.java | 250 +
 .../ZooKeeperLeaderRetrievalTest.java  |  12 +-
 3 files changed, 318 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 8b33c2c..1b2ff44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -34,9 +34,13 @@ import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
@@ -224,6 +228,12 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
exception = t;
}
 
+   try {
+   cleanupZooKeeperPaths();
+   } catch (Throwable t) {
+   exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+   }
+
internalClose();
 
if (exception != null) {
@@ -232,6 +242,58 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
}
 
/**
+* Cleans up leftover ZooKeeper paths.
+*/
+   private void cleanupZooKeeperPaths() throws Exception {
+   deleteOwnedZNode();
+   tryDeleteEmptyParentZNodes();
+   }
+
+   private void deleteOwnedZNode() throws Exception {
+   // delete the HA_CLUSTER_ID znode which is owned by this cluster
+   client.delete().deletingChildrenIfNeeded().forPath("/");
+   }
+
+   /**
+* Tries to delete empty parent znodes.
+*
+* IMPORTANT: This method can be removed once all supported 
ZooKeeper versions
+* support the container {@link org.apache.zookeeper.CreateMode}.
+*
+* @throws Exception if the deletion fails for other reason than {@link 
KeeperException.NotEmptyException}
+*/
+   private void tryDeleteEmptyParentZNodes() throws Exception {
+   // try to delete the parent znodes if they are empty
+   String remainingPath = 
getParentPath(getNormalizedPath(client.getNamespace()));
+   final CuratorFramework nonNamespaceClient = 
client.usingNamespace(null);
+
+   while (!isRootPath(remainingPath)) {
+   try {
+   
nonNamespaceClient.delete().forPath(remainingPath);
+   } catch (KeeperException.NotEmptyException ignored) {
+   // We can only delete empty znodes
+   break;
+   }
+
+   remainingPath = getParentPath(remainingPath);
+   }
+   }
+
+   private static boolean isRootPath(String remainingPath) {
+   return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
+   }
+
+   @Nonnull
+   private static String getNormalizedPath(String path) {
+   return ZKPaths.makePath(path, "");
+   }
+
+   @Nonnull
+   private static String getParentPath(String path) {
+   return ZKPaths.getPathAndNode(path).getPath();
+   }
+
+   /**
 * Closes components which don't distinguish between close and 
closeAndCleanupAllData.
 */
private void internalClose() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
new file mode 100644
index 000..0edfdb9
--- /dev/null
+++ 

[flink] branch release-1.8 updated (ce2d65d -> f53594d)

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ce2d65d  [FLINK-11796] [State Backends] Remove Snapshotable interface
 new d7386f1  [hotfix] Fix checkstyle violations in ZooKeeperHaServices
 new 622766f  [hotfix] Fix checkstyle violations in ZooKeeperUtils
 new 80efb71  [hotfix] Fix checkstyle violations in 
ZooKeeperLeaderRetrievalTest
 new 34d634e  [hotfix][tests] Speed up 
ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress
 new f53594d  [FLINK-11336][zk] Delete ZNodes when 
ZooKeeperHaServices#closeAndCleanupAllData

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../zookeeper/ZooKeeperHaServices.java |  91 ++--
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   3 +-
 .../zookeeper/ZooKeeperHaServicesTest.java | 250 +
 .../ZooKeeperLeaderRetrievalTest.java  |  26 ++-
 4 files changed, 341 insertions(+), 29 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java



[flink] 01/05: [hotfix] Fix checkstyle violations in ZooKeeperHaServices

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d7386f1697c17bf9c03bb162aed9fd5fbca8b8d6
Author: Till Rohrmann 
AuthorDate: Fri Mar 1 12:55:40 2019 +0100

[hotfix] Fix checkstyle violations in ZooKeeperHaServices
---
 .../zookeeper/ZooKeeperHaServices.java | 29 +++---
 1 file changed, 14 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 114d281..8b33c2c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -45,7 +45,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * An implementation of the {@link HighAvailabilityServices} using Apache 
ZooKeeper.
  * The services store data in ZooKeeper's nodes as illustrated by the 
following tree structure:
- * 
+ *
  * 
  * /flink
  *  +/cluster_id_1/resource_manager_lock
@@ -56,7 +56,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *  || /latest-2
  *  ||
  *  |+/job-id-2/job_manager_lock
- *  |  
+ *  |
  *  +/cluster_id_2/resource_manager_lock
  *   |
  *   +/job-id-1/job_manager_lock
@@ -64,18 +64,18 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *|/latest-1
  *|/persisted_job_graph
  * 
- * 
+ *
  * The root path "/flink" is configurable via the option {@link 
HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
  * This makes sure Flink stores its data under specific subtrees in ZooKeeper, 
for example to
  * accommodate specific permission.
- * 
- * The "cluster_id" part identifies the data stored for a specific Flink 
"cluster". 
+ *
+ * The "cluster_id" part identifies the data stored for a specific Flink 
"cluster".
  * This "cluster" can be either a standalone or containerized Flink cluster, 
or it can be job
  * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
- * 
+ *
  * In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is 
generated and configured
  * automatically by the client or dispatcher that submits the Job to YARN or 
Mesos.
- * 
+ *
  * In the case of a standalone cluster, that cluster-id needs to be 
configured via
  * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same 
cluster id will join the same
  * cluster and participate in the execution of the same set of jobs.
@@ -93,21 +93,20 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
private static final String REST_SERVER_LEADER_PATH = 
"/rest_server_lock";
 
// 

-   
-   
-   /** The ZooKeeper client to use */
+
+   /** The ZooKeeper client to use. */
private final CuratorFramework client;
 
-   /** The executor to run ZooKeeper callbacks on */
+   /** The executor to run ZooKeeper callbacks on. */
private final Executor executor;
 
-   /** The runtime configuration */
+   /** The runtime configuration. */
private final Configuration configuration;
 
-   /** The zookeeper based running jobs registry */
+   /** The zookeeper based running jobs registry. */
private final RunningJobsRegistry runningJobsRegistry;
 
-   /** Store for arbitrary blobs */
+   /** Store for arbitrary blobs. */
private final BlobStoreService blobStoreService;
 
public ZooKeeperHaServices(
@@ -233,7 +232,7 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
}
 
/**
-* Closes components which don't distinguish between close and 
closeAndCleanupAllData
+* Closes components which don't distinguish between close and 
closeAndCleanupAllData.
 */
private void internalClose() {
client.close();



[flink] 04/05: [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34d634e961ff1cc249deff0c9123e4fcda67d30f
Author: Till Rohrmann 
AuthorDate: Mon Mar 4 10:50:54 2019 +0100

[hotfix][tests] Speed up 
ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress

Decrease the timeout from 10s to 1s in testTimeoutOfFindConnectingAddress 
to speed up the test.
---
 .../flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 42c1eca..4a1cf80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -197,7 +197,7 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
 */
@Test
public void testTimeoutOfFindConnectingAddress() throws Exception {
-   FiniteDuration timeout = new FiniteDuration(10L, 
TimeUnit.SECONDS);
+   FiniteDuration timeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
 
LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
InetAddress result = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);



[flink] 04/05: [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f9e3a3f8ba9ace28e961484534217e6fc8a5309
Author: Till Rohrmann 
AuthorDate: Mon Mar 4 10:50:54 2019 +0100

[hotfix][tests] Speed up 
ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress

Decrease the timeout from 10s to 1s in testTimeoutOfFindConnectingAddress 
to speed up the test.
---
 .../flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 42c1eca..4a1cf80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -197,7 +197,7 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
 */
@Test
public void testTimeoutOfFindConnectingAddress() throws Exception {
-   FiniteDuration timeout = new FiniteDuration(10L, 
TimeUnit.SECONDS);
+   FiniteDuration timeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
 
LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
InetAddress result = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);



[flink] branch master updated (e8daa49 -> 11a8234)

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e8daa49  [FLINK-11796] [State Backends] Remove Snapshotable interface
 new f60b878  [hotfix] Fix checkstyle violations in ZooKeeperHaServices
 new f9e352b  [hotfix] Fix checkstyle violations in ZooKeeperUtils
 new c4786e8  [hotfix] Fix checkstyle violations in 
ZooKeeperLeaderRetrievalTest
 new 3f9e3a3  [hotfix][tests] Speed up 
ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress
 new 11a8234  [FLINK-11336][zk] Delete ZNodes when 
ZooKeeperHaServices#closeAndCleanupAllData

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../zookeeper/ZooKeeperHaServices.java |  91 ++--
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   3 +-
 .../zookeeper/ZooKeeperHaServicesTest.java | 250 +
 .../ZooKeeperLeaderRetrievalTest.java  |  26 ++-
 4 files changed, 341 insertions(+), 29 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java



[flink] 01/05: [hotfix] Fix checkstyle violations in ZooKeeperHaServices

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f60b8787156f42f07e54f8facf14c8c28982d22e
Author: Till Rohrmann 
AuthorDate: Fri Mar 1 12:55:40 2019 +0100

[hotfix] Fix checkstyle violations in ZooKeeperHaServices
---
 .../zookeeper/ZooKeeperHaServices.java | 29 +++---
 1 file changed, 14 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 114d281..8b33c2c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -45,7 +45,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * An implementation of the {@link HighAvailabilityServices} using Apache 
ZooKeeper.
  * The services store data in ZooKeeper's nodes as illustrated by the 
following tree structure:
- * 
+ *
  * 
  * /flink
  *  +/cluster_id_1/resource_manager_lock
@@ -56,7 +56,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *  || /latest-2
  *  ||
  *  |+/job-id-2/job_manager_lock
- *  |  
+ *  |
  *  +/cluster_id_2/resource_manager_lock
  *   |
  *   +/job-id-1/job_manager_lock
@@ -64,18 +64,18 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *|/latest-1
  *|/persisted_job_graph
  * 
- * 
+ *
  * The root path "/flink" is configurable via the option {@link 
HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
  * This makes sure Flink stores its data under specific subtrees in ZooKeeper, 
for example to
  * accommodate specific permission.
- * 
- * The "cluster_id" part identifies the data stored for a specific Flink 
"cluster". 
+ *
+ * The "cluster_id" part identifies the data stored for a specific Flink 
"cluster".
  * This "cluster" can be either a standalone or containerized Flink cluster, 
or it can be job
  * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
- * 
+ *
  * In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is 
generated and configured
  * automatically by the client or dispatcher that submits the Job to YARN or 
Mesos.
- * 
+ *
  * In the case of a standalone cluster, that cluster-id needs to be 
configured via
  * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same 
cluster id will join the same
  * cluster and participate in the execution of the same set of jobs.
@@ -93,21 +93,20 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
private static final String REST_SERVER_LEADER_PATH = 
"/rest_server_lock";
 
// 

-   
-   
-   /** The ZooKeeper client to use */
+
+   /** The ZooKeeper client to use. */
private final CuratorFramework client;
 
-   /** The executor to run ZooKeeper callbacks on */
+   /** The executor to run ZooKeeper callbacks on. */
private final Executor executor;
 
-   /** The runtime configuration */
+   /** The runtime configuration. */
private final Configuration configuration;
 
-   /** The zookeeper based running jobs registry */
+   /** The zookeeper based running jobs registry. */
private final RunningJobsRegistry runningJobsRegistry;
 
-   /** Store for arbitrary blobs */
+   /** Store for arbitrary blobs. */
private final BlobStoreService blobStoreService;
 
public ZooKeeperHaServices(
@@ -233,7 +232,7 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
}
 
/**
-* Closes components which don't distinguish between close and 
closeAndCleanupAllData
+* Closes components which don't distinguish between close and 
closeAndCleanupAllData.
 */
private void internalClose() {
client.close();



[flink] 05/05: [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 11a8234b50586d476371c64f9d751e8e65fdad0a
Author: Till Rohrmann 
AuthorDate: Fri Mar 1 16:26:48 2019 +0100

[FLINK-11336][zk] Delete ZNodes when 
ZooKeeperHaServices#closeAndCleanupAllData

When calling ZooKeeperHaServices#closeAndCleanupAllData we should delete the
HA_CLUSTER_ID znode which is owned by the respective ZooKeeperHaServices.
Moreover, the method tries to go up the chain of parent znodes and tries to
delete all empty parent nodes. This should clean up otherwisely orphaned
parent znodes.

This closes #7880.
---
 .../zookeeper/ZooKeeperHaServices.java |  62 +
 .../zookeeper/ZooKeeperHaServicesTest.java | 250 +
 .../ZooKeeperLeaderRetrievalTest.java  |  12 +-
 3 files changed, 318 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 8b33c2c..1b2ff44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -34,9 +34,13 @@ import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
@@ -224,6 +228,12 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
exception = t;
}
 
+   try {
+   cleanupZooKeeperPaths();
+   } catch (Throwable t) {
+   exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+   }
+
internalClose();
 
if (exception != null) {
@@ -232,6 +242,58 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
}
 
/**
+* Cleans up leftover ZooKeeper paths.
+*/
+   private void cleanupZooKeeperPaths() throws Exception {
+   deleteOwnedZNode();
+   tryDeleteEmptyParentZNodes();
+   }
+
+   private void deleteOwnedZNode() throws Exception {
+   // delete the HA_CLUSTER_ID znode which is owned by this cluster
+   client.delete().deletingChildrenIfNeeded().forPath("/");
+   }
+
+   /**
+* Tries to delete empty parent znodes.
+*
+* IMPORTANT: This method can be removed once all supported 
ZooKeeper versions
+* support the container {@link org.apache.zookeeper.CreateMode}.
+*
+* @throws Exception if the deletion fails for other reason than {@link 
KeeperException.NotEmptyException}
+*/
+   private void tryDeleteEmptyParentZNodes() throws Exception {
+   // try to delete the parent znodes if they are empty
+   String remainingPath = 
getParentPath(getNormalizedPath(client.getNamespace()));
+   final CuratorFramework nonNamespaceClient = 
client.usingNamespace(null);
+
+   while (!isRootPath(remainingPath)) {
+   try {
+   
nonNamespaceClient.delete().forPath(remainingPath);
+   } catch (KeeperException.NotEmptyException ignored) {
+   // We can only delete empty znodes
+   break;
+   }
+
+   remainingPath = getParentPath(remainingPath);
+   }
+   }
+
+   private static boolean isRootPath(String remainingPath) {
+   return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
+   }
+
+   @Nonnull
+   private static String getNormalizedPath(String path) {
+   return ZKPaths.makePath(path, "");
+   }
+
+   @Nonnull
+   private static String getParentPath(String path) {
+   return ZKPaths.getPathAndNode(path).getPath();
+   }
+
+   /**
 * Closes components which don't distinguish between close and 
closeAndCleanupAllData.
 */
private void internalClose() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
new file mode 100644
index 000..0edfdb9
--- /dev/null
+++ 

[flink] 03/05: [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c4786e8519c6394ffccadd661c8462108c70cdc6
Author: Till Rohrmann 
AuthorDate: Mon Mar 4 10:43:00 2019 +0100

[hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest
---
 .../leaderelection/ZooKeeperLeaderRetrievalTest.java   | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0282a4f..42c1eca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -35,12 +32,12 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -50,8 +47,13 @@ import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the ZooKeeper based leader election and retrieval.
+ */
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
private TestingServer testingServer;
@@ -79,7 +81,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@After
public void after() throws Exception {
-   if(testingServer != null) {
+   if (testingServer != null) {
testingServer.stop();
 
testingServer = null;



[flink] 02/05: [hotfix] Fix checkstyle violations in ZooKeeperUtils

2019-03-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9e352b413d30972d01ccfdd7d70e7cf3843803c
Author: Till Rohrmann 
AuthorDate: Fri Mar 1 15:26:09 2019 +0100

[hotfix] Fix checkstyle violations in ZooKeeperUtils
---
 .../src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java| 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 86b7e6b..ba253c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -169,8 +169,7 @@ public class ZooKeeperUtils {
 */
public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
final CuratorFramework client,
-   final Configuration configuration) throws Exception
-   {
+   final Configuration configuration) throws Exception {
return createLeaderRetrievalService(client, configuration, "");
}
 



[flink] 01/02: [FLINK-11731] [State Backends] Make DefaultOperatorStateBackend follow the builder pattern

2019-03-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f62b890624116cbe205cdcc9762e41f6a80c84f
Author: Yu Li 
AuthorDate: Sat Mar 2 11:19:21 2019 +0100

[FLINK-11731] [State Backends] Make DefaultOperatorStateBackend follow the 
builder pattern

This closes #7899.

(cherry picked from commit 94f84a5c7b4876dee5b34e6a3725da7ee06a607f)
---
 .../flink/streaming/tests/StubStateBackend.java|   9 +-
 .../flink/runtime/state/AbstractStateBackend.java  |   6 +-
 .../runtime/state/DefaultOperatorStateBackend.java | 486 +
 .../state/DefaultOperatorStateBackendBuilder.java  |  98 +
 ...efaultOperatorStateBackendSnapshotStrategy.java | 213 +
 .../state/OperatorStateRestoreOperation.java   | 219 ++
 .../runtime/state/PartitionableListState.java  | 135 ++
 .../apache/flink/runtime/state/StateBackend.java   |   8 +-
 .../runtime/state/filesystem/FsStateBackend.java   |  13 +-
 .../state/heap/HeapKeyedStateBackendBuilder.java   |   1 +
 .../runtime/state/memory/MemoryStateBackend.java   |  19 +-
 .../CheckpointSettingsSerializableTest.java|   6 +-
 .../runtime/state/OperatorStateBackendTest.java| 114 +++--
 .../state/StateBackendMigrationTestBase.java   |  13 +-
 .../runtime/state/ttl/mock/MockStateBackend.java   |   7 +-
 .../streaming/state/RocksDBStateBackend.java   |  19 +-
 .../operators/StreamTaskStateInitializerImpl.java  |  21 +-
 .../operators/BackendRestorerProcedureTest.java|  10 +-
 .../StreamTaskStateInitializerImplTest.java|   5 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |  97 ++--
 .../runtime/tasks/TestSpyWrapperStateBackend.java  |   8 +-
 .../test/streaming/runtime/StateBackendITCase.java |   5 +-
 23 files changed, 962 insertions(+), 556 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
index dec4f2d..0d0c895 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
@@ -96,7 +97,11 @@ final class StubStateBackend implements StateBackend {
}
 
@Override
-   public OperatorStateBackend createOperatorStateBackend(Environment env, 
String operatorIdentifier) throws Exception {
-   return backend.createOperatorStateBackend(env, 
operatorIdentifier);
+   public OperatorStateBackend createOperatorStateBackend(
+   Environment env,
+   String operatorIdentifier,
+   @Nonnull Collection stateHandles,
+   CloseableRegistry cancelStreamRegistry) throws Exception {
+   return backend.createOperatorStateBackend(env, 
operatorIdentifier, stateHandles, cancelStreamRegistry);
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 2343d83..8d94898 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -71,6 +71,8 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
 
@Override
public abstract OperatorStateBackend createOperatorStateBackend(
-   Environment env,
-   String operatorIdentifier) throws Exception;
+   Environment env,
+   String operatorIdentifier,
+   @Nonnull Collection stateHandles,
+   CloseableRegistry cancelStreamRegistry) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index a3541b6..f6a0dba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 

[flink] branch release-1.8 updated (b027fa7 -> ce2d65d)

2019-03-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b027fa7  Fix version change expressions in releasing scripts
 new 2f62b89  [FLINK-11731] [State Backends] Make 
DefaultOperatorStateBackend follow the builder pattern
 new ce2d65d  [FLINK-11796] [State Backends] Remove Snapshotable interface

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/streaming/tests/StubStateBackend.java|   9 +-
 .../runtime/state/AbstractKeyedStateBackend.java   |   3 +-
 .../flink/runtime/state/AbstractStateBackend.java  |   6 +-
 .../runtime/state/DefaultOperatorStateBackend.java | 492 +
 .../state/DefaultOperatorStateBackendBuilder.java  |  98 
 ...efaultOperatorStateBackendSnapshotStrategy.java | 213 +
 .../flink/runtime/state/OperatorStateBackend.java  |   5 +-
 .../state/OperatorStateRestoreOperation.java   | 219 +
 .../runtime/state/PartitionableListState.java  | 135 ++
 .../apache/flink/runtime/state/Snapshotable.java   |  41 --
 .../apache/flink/runtime/state/StateBackend.java   |   8 +-
 .../runtime/state/filesystem/FsStateBackend.java   |  13 +-
 .../runtime/state/heap/HeapKeyedStateBackend.java  |   6 -
 .../state/heap/HeapKeyedStateBackendBuilder.java   |   1 +
 .../runtime/state/memory/MemoryStateBackend.java   |  19 +-
 .../CheckpointSettingsSerializableTest.java|   6 +-
 .../runtime/state/OperatorStateBackendTest.java| 114 +++--
 .../state/StateBackendMigrationTestBase.java   |  13 +-
 .../state/ttl/mock/MockKeyedStateBackend.java  |   7 -
 .../runtime/state/ttl/mock/MockStateBackend.java   |   7 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |   6 -
 .../streaming/state/RocksDBStateBackend.java   |  19 +-
 .../api/operators/BackendRestorerProcedure.java|  23 +-
 .../operators/StreamTaskStateInitializerImpl.java  |  21 +-
 .../operators/BackendRestorerProcedureTest.java|  10 +-
 .../StreamTaskStateInitializerImplTest.java|  11 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |  97 ++--
 .../runtime/tasks/TestSpyWrapperStateBackend.java  |   8 +-
 .../test/streaming/runtime/StateBackendITCase.java |   5 +-
 30 files changed, 968 insertions(+), 653 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendBuilder.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java



[flink] 02/02: [FLINK-11796] [State Backends] Remove Snapshotable interface

2019-03-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce2d65d88d28bcd5cf182395d511a239dacfb89d
Author: Yu Li 
AuthorDate: Sat Mar 2 20:00:43 2019 +0100

[FLINK-11796] [State Backends] Remove Snapshotable interface

(cherry picked from commit e8daa49a593edc401cd44761b25b1324b11be4a6)
---
 .../runtime/state/AbstractKeyedStateBackend.java   |  3 +-
 .../runtime/state/DefaultOperatorStateBackend.java |  8 +
 .../flink/runtime/state/OperatorStateBackend.java  |  5 ++-
 .../apache/flink/runtime/state/Snapshotable.java   | 41 --
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  6 
 .../state/ttl/mock/MockKeyedStateBackend.java  |  7 
 .../streaming/state/RocksDBKeyedStateBackend.java  |  6 
 .../api/operators/BackendRestorerProcedure.java| 23 ++--
 .../StreamTaskStateInitializerImplTest.java|  6 
 9 files changed, 7 insertions(+), 98 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 98e21ff..e28aeef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -52,7 +51,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public abstract class AbstractKeyedStateBackend implements
KeyedStateBackend,
-   Snapshotable, 
Collection>,
+   SnapshotStrategy>,
Closeable,
CheckpointListener {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index f6a0dba..48c8eb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -39,7 +39,6 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.RunnableFuture;
@@ -246,13 +245,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
}
 
// 
---
-   //  Snapshot and restore
+   //  Snapshot
// 
---
-
-   public void restore(Collection restoreSnapshots) 
throws Exception {
-   // all restore work done in builder and nothing to do here
-   }
-
@Nonnull
@Override
public RunnableFuture> snapshot(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index 3cbb351..4fb8024 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -22,16 +22,15 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.util.Disposable;
 
 import java.io.Closeable;
-import java.util.Collection;
 
 /**
  * Interface that combines both, the user facing {@link OperatorStateStore} 
interface and the system interface
- * {@link Snapshotable}
+ * {@link SnapshotStrategy}
  *
  */
 public interface OperatorStateBackend extends
OperatorStateStore,
-   Snapshotable, 
Collection>,
+   SnapshotStrategy>,
Closeable,
Disposable {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
deleted file mode 100644
index 1677855..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * 

[flink] branch master updated (fb256d4 -> e8daa49)

2019-03-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fb256d4  [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory 
for Blink (#7898)
 new 94f84a5  [FLINK-11731] [State Backends] Make 
DefaultOperatorStateBackend follow the builder pattern
 new e8daa49  [FLINK-11796] [State Backends] Remove Snapshotable interface

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/streaming/tests/StubStateBackend.java|   9 +-
 .../runtime/state/AbstractKeyedStateBackend.java   |   3 +-
 .../flink/runtime/state/AbstractStateBackend.java  |   6 +-
 .../runtime/state/DefaultOperatorStateBackend.java | 492 +
 .../state/DefaultOperatorStateBackendBuilder.java  |  98 
 ...efaultOperatorStateBackendSnapshotStrategy.java | 213 +
 .../flink/runtime/state/OperatorStateBackend.java  |   5 +-
 .../state/OperatorStateRestoreOperation.java   | 219 +
 .../runtime/state/PartitionableListState.java  | 135 ++
 .../apache/flink/runtime/state/Snapshotable.java   |  41 --
 .../apache/flink/runtime/state/StateBackend.java   |   8 +-
 .../runtime/state/filesystem/FsStateBackend.java   |  13 +-
 .../runtime/state/heap/HeapKeyedStateBackend.java  |   6 -
 .../state/heap/HeapKeyedStateBackendBuilder.java   |   1 +
 .../runtime/state/memory/MemoryStateBackend.java   |  19 +-
 .../CheckpointSettingsSerializableTest.java|   6 +-
 .../runtime/state/OperatorStateBackendTest.java| 114 +++--
 .../state/StateBackendMigrationTestBase.java   |  13 +-
 .../state/ttl/mock/MockKeyedStateBackend.java  |   7 -
 .../runtime/state/ttl/mock/MockStateBackend.java   |   7 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |   6 -
 .../streaming/state/RocksDBStateBackend.java   |  19 +-
 .../api/operators/BackendRestorerProcedure.java|  23 +-
 .../operators/StreamTaskStateInitializerImpl.java  |  21 +-
 .../operators/BackendRestorerProcedureTest.java|  10 +-
 .../StreamTaskStateInitializerImplTest.java|  11 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |  97 ++--
 .../runtime/tasks/TestSpyWrapperStateBackend.java  |   8 +-
 .../test/streaming/runtime/StateBackendITCase.java |   5 +-
 30 files changed, 968 insertions(+), 653 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendBuilder.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java



[flink] 02/02: [FLINK-11796] [State Backends] Remove Snapshotable interface

2019-03-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e8daa49a593edc401cd44761b25b1324b11be4a6
Author: Yu Li 
AuthorDate: Sun Mar 3 03:00:43 2019 +0800

[FLINK-11796] [State Backends] Remove Snapshotable interface
---
 .../runtime/state/AbstractKeyedStateBackend.java   |  3 +-
 .../runtime/state/DefaultOperatorStateBackend.java |  8 +
 .../flink/runtime/state/OperatorStateBackend.java  |  5 ++-
 .../apache/flink/runtime/state/Snapshotable.java   | 41 --
 .../runtime/state/heap/HeapKeyedStateBackend.java  |  6 
 .../state/ttl/mock/MockKeyedStateBackend.java  |  7 
 .../streaming/state/RocksDBKeyedStateBackend.java  |  6 
 .../api/operators/BackendRestorerProcedure.java| 23 ++--
 .../StreamTaskStateInitializerImplTest.java|  6 
 9 files changed, 7 insertions(+), 98 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 98e21ff..e28aeef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -52,7 +51,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public abstract class AbstractKeyedStateBackend implements
KeyedStateBackend,
-   Snapshotable, 
Collection>,
+   SnapshotStrategy>,
Closeable,
CheckpointListener {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index f6a0dba..48c8eb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -39,7 +39,6 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.RunnableFuture;
@@ -246,13 +245,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
}
 
// 
---
-   //  Snapshot and restore
+   //  Snapshot
// 
---
-
-   public void restore(Collection restoreSnapshots) 
throws Exception {
-   // all restore work done in builder and nothing to do here
-   }
-
@Nonnull
@Override
public RunnableFuture> snapshot(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index 3cbb351..4fb8024 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -22,16 +22,15 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.util.Disposable;
 
 import java.io.Closeable;
-import java.util.Collection;
 
 /**
  * Interface that combines both, the user facing {@link OperatorStateStore} 
interface and the system interface
- * {@link Snapshotable}
+ * {@link SnapshotStrategy}
  *
  */
 public interface OperatorStateBackend extends
OperatorStateStore,
-   Snapshotable, 
Collection>,
+   SnapshotStrategy>,
Closeable,
Disposable {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
deleted file mode 100644
index 1677855..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable 

[flink] branch master updated: [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory for Blink (#7898)

2019-03-05 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new fb256d4  [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory 
for Blink (#7898)
fb256d4 is described below

commit fb256d48e52c490b6a899c17c1225d30f7a6c92f
Author: Jingsong Lee 
AuthorDate: Tue Mar 5 21:17:15 2019 +0800

[FLINK-11803][table-planner-blink] Improve FlinkTypeFactory for Blink 
(#7898)
---
 .../flink/table/calcite/FlinkTypeFactory.scala | 167 -
 .../flink/table/calcite/FlinkTypeSystem.scala  |  51 +++
 .../flink/table/plan/schema/ArrayRelDataType.scala |  55 +++
 .../flink/table/plan/schema/InlineTable.scala  |  18 +--
 .../flink/table/plan/schema/MapRelDataType.scala   |  50 ++
 .../flink/table/plan/schema/RowRelDataType.scala   |  89 +++
 .../apache/flink/table/plan/schema/RowSchema.scala |  72 +
 .../flink/table/calcite/FlinkTypeFactoryTest.scala |  75 +
 8 files changed, 497 insertions(+), 80 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index ba52152..0ba2252 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,28 +18,22 @@
 
 package org.apache.flink.table.calcite
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.ValueTypeInfo._
-import org.apache.flink.table.`type`.DecimalType
+import org.apache.flink.table.`type`.{ArrayType, DecimalType, InternalType, 
InternalTypes, MapType, RowType}
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
-import org.apache.flink.table.typeutils._
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, 
RowRelDataType, RowSchema}
 
-import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
-import org.apache.calcite.sql.SqlIntervalQualifier
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.parser.SqlParserPos
 
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
-  * Flink specific type factory that represents the interface between Flink's 
[[TypeInformation]]
+  * Flink specific type factory that represents the interface between Flink's 
[[InternalType]]
   * and Calcite's [[RelDataType]].
   */
 class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImpl(typeSystem) {
@@ -47,39 +41,51 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
-case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => false
-case _: BasicTypeInfo[_] => false
-case _: SqlTimeTypeInfo[_] => false
-case _: TimeIntervalTypeInfo[_] => false
-case _ => true
-  }
+  private val seenTypes = mutable.HashMap[(InternalType, Boolean), 
RelDataType]()
 
-  def createTypeFromTypeInfo(
-  typeInfo: TypeInformation[_],
+  def createTypeFromInternalType(
+  tp: InternalType,
   isNullable: Boolean): RelDataType = {
 
-// we cannot use seenTypes for simple types,
-// because time indicators and timestamps would be the same
-
-val relType = if (!isAdvanced(typeInfo)) {
-  // simple types can be converted to SQL types and vice versa
-  val sqlType = typeInfoToSqlTypeName(typeInfo)
-  sqlType match {
-
-case INTERVAL_YEAR_MONTH =>
-  createSqlIntervalType(
-new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, 
SqlParserPos.ZERO))
-
-case INTERVAL_DAY_SECOND =>
-  createSqlIntervalType(
-new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO))
-
-case _ =>
-  createSqlType(sqlType)
-  }
-} else {
-  throw new TableException("Unsupported now")
+val relType = seenTypes.get((tp, isNullable)) match {
+  case Some(retType: RelDataType) => retType
+  case None =>
+val refType = tp match {
+  case InternalTypes.BOOLEAN => createSqlType(BOOLEAN)
+  case InternalTypes.BYTE => createSqlType(TINYINT)
+  case InternalTypes.SHORT => 

[flink] branch release-1.7 updated (e3c0e69 -> 70bc26c)

2019-03-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e3c0e69  [FLINK-9003][E2E] Add operators with input type that goes 
through custom, stateful serialization
 new d099898  [hotfix][core] Fix Tuple0Serializer handling of null
 new 13b3e6b  [hotfix][core] Comparing whole collections rather than 
contents in KryoGenericTypeSerializerTest
 new c3b49ee  [hotfix][tests] Call all methods from SerializerTestBase in 
SerializerTestInstance by reflection
 new 54cf610  [FLINK-11420][core] Fixed duplicate method of 
TraversableSerializer
 new f4c0991  [hotfix][tests] Added matcher that performs deep comparison 
with taking Tuples into account
 new 35778da  [hotfix][core] Added snapshot for NothingSerializerSnapshot
 new 70bc26c  [FLINK-11823][core] Fixed duplicate method of TrySerializer

The 15333 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/typeutils/runtime/Tuple0Serializer.java   |   2 +
 .../common/typeutils/CompositeSerializerTest.java  |   6 -
 .../api/common/typeutils/SerializerTestBase.java   |  82 --
 .../common/typeutils/SerializerTestInstance.java   |  55 +--
 .../java/typeutils/runtime/RowSerializerTest.java  |  14 +-
 .../typeutils/runtime/TupleSerializerTest.java |   3 +-
 .../runtime/TupleSerializerTestInstance.java   |  79 --
 .../flink/testutils/CustomEqualityMatcher.java |  70 +
 .../flink/testutils/DeeplyEqualsChecker.java   | 175 +
 .../valuearray/ByteValueArraySerializerTest.java   |   3 +-
 .../valuearray/CharValueArraySerializerTest.java   |   3 +-
 .../valuearray/DoubleValueArraySerializerTest.java |   3 +-
 .../valuearray/FloatValueArraySerializerTest.java  |   3 +-
 .../valuearray/IntValueArraySerializerTest.java|   3 +-
 .../valuearray/LongValueArraySerializerTest.java   |   3 +-
 .../valuearray/NullValueArraySerializerTest.java   |   3 +-
 .../valuearray/ShortValueArraySerializerTest.java  |   3 +-
 .../valuearray/StringValueArraySerializerTest.java |   3 +-
 .../valuearray/ValueArraySerializerTestBase.java   |  38 ++---
 .../api/scala/typeutils/NothingSerializer.scala|  10 +-
 .../scala/typeutils/TraversableSerializer.scala|   2 +-
 .../flink/api/scala/typeutils/TrySerializer.scala  |  17 +-
 .../runtime/KryoGenericTypeSerializerTest.scala|   8 +-
 .../runtime/ScalaSpecialTypesSerializerTest.scala  |  83 +++---
 .../scala/runtime/TraversableSerializerTest.scala  |  17 --
 .../runtime/TupleSerializerTestInstance.scala  |  81 +-
 26 files changed, 471 insertions(+), 298 deletions(-)
 delete mode 100644 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java
 create mode 100644 
flink-core/src/test/java/org/apache/flink/testutils/CustomEqualityMatcher.java
 create mode 100644 
flink-core/src/test/java/org/apache/flink/testutils/DeeplyEqualsChecker.java
 copy 
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringSerializerTest.java
 => 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArraySerializerTestBase.java
 (56%)



[flink] branch release-1.8 updated: Fix version change expressions in releasing scripts

2019-03-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new b027fa7  Fix version change expressions in releasing scripts
b027fa7 is described below

commit b027fa74fd677e59d0cffe6c74bc4915521a1c61
Author: Aljoscha Krettek 
AuthorDate: Tue Mar 5 11:42:44 2019 +0100

Fix version change expressions in releasing scripts

The earlier version had "\1${NEW_VERSION}" in there, which would resolve
to "\11.8.", i.e. the backreference would now be \11.
---
 tools/releasing/create_release_branch.sh | 2 +-
 tools/releasing/update_branch_version.sh | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/tools/releasing/create_release_branch.sh 
b/tools/releasing/create_release_branch.sh
index 9673e62..81619e7 100755
--- a/tools/releasing/create_release_branch.sh
+++ b/tools/releasing/create_release_branch.sh
@@ -57,7 +57,7 @@ fi
 git checkout -b $target_branch
 
 #change version in all pom files
-find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#'
 {} \;
+find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#'
 {} \;
 
 #change version of documentation
 cd docs
diff --git a/tools/releasing/update_branch_version.sh 
b/tools/releasing/update_branch_version.sh
index 10290d9..6f00093 100755
--- a/tools/releasing/update_branch_version.sh
+++ b/tools/releasing/update_branch_version.sh
@@ -49,7 +49,7 @@ fi
 cd ..
 
 #change version in all pom files
-find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#'
 {} \;
+find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#'
 {} \;
 
 #change version of documentation
 cd docs



[flink] branch master updated: Fix version change expressions in releasing scripts

2019-03-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 60bedd9  Fix version change expressions in releasing scripts
60bedd9 is described below

commit 60bedd9dcda3d10b15999b124375ea381a4eee2d
Author: Aljoscha Krettek 
AuthorDate: Tue Mar 5 11:42:44 2019 +0100

Fix version change expressions in releasing scripts

The earlier version had "\1${NEW_VERSION}" in there, which would resolve
to "\11.8.", i.e. the backreference would now be \11.
---
 tools/releasing/create_release_branch.sh | 2 +-
 tools/releasing/update_branch_version.sh | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/tools/releasing/create_release_branch.sh 
b/tools/releasing/create_release_branch.sh
index 9673e62..81619e7 100755
--- a/tools/releasing/create_release_branch.sh
+++ b/tools/releasing/create_release_branch.sh
@@ -57,7 +57,7 @@ fi
 git checkout -b $target_branch
 
 #change version in all pom files
-find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#'
 {} \;
+find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#'
 {} \;
 
 #change version of documentation
 cd docs
diff --git a/tools/releasing/update_branch_version.sh 
b/tools/releasing/update_branch_version.sh
index 10290d9..6f00093 100755
--- a/tools/releasing/update_branch_version.sh
+++ b/tools/releasing/update_branch_version.sh
@@ -49,7 +49,7 @@ fi
 cd ..
 
 #change version in all pom files
-find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#'
 {} \;
+find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#'
 {} \;
 
 #change version of documentation
 cd docs



[flink] branch release-1.8 updated: Fix create_release_branch.sh to accomodate Hadoop Versions

2019-03-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 571997b  Fix create_release_branch.sh to accomodate Hadoop Versions
571997b is described below

commit 571997b2e3e7b078d52f5e866d62a1c43050646c
Author: Aljoscha Krettek 
AuthorDate: Tue Mar 5 11:18:33 2019 +0100

Fix create_release_branch.sh to accomodate Hadoop Versions

Before, this would not correctly change version tags that have a version
prefix or suffix, like the shaded Hadoop modules.
---
 tools/releasing/create_release_branch.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tools/releasing/create_release_branch.sh 
b/tools/releasing/create_release_branch.sh
index 615722b..9673e62 100755
--- a/tools/releasing/create_release_branch.sh
+++ b/tools/releasing/create_release_branch.sh
@@ -57,7 +57,7 @@ fi
 git checkout -b $target_branch
 
 #change version in all pom files
-find . -name 'pom.xml' -type f -exec perl -pi -e 
's#'$OLD_VERSION'#'$NEW_VERSION'#' {} \;
+find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#'
 {} \;
 
 #change version of documentation
 cd docs



[flink] branch master updated: Fix create_release_branch.sh to accomodate Hadoop Versions

2019-03-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9d6f221  Fix create_release_branch.sh to accomodate Hadoop Versions
9d6f221 is described below

commit 9d6f221ba9b514a0929d0a0e429852d71b7d61f8
Author: Aljoscha Krettek 
AuthorDate: Tue Mar 5 11:18:33 2019 +0100

Fix create_release_branch.sh to accomodate Hadoop Versions

Before, this would not correctly change version tags that have a version
prefix or suffix, like the shaded Hadoop modules.
---
 tools/releasing/create_release_branch.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tools/releasing/create_release_branch.sh 
b/tools/releasing/create_release_branch.sh
index 615722b..9673e62 100755
--- a/tools/releasing/create_release_branch.sh
+++ b/tools/releasing/create_release_branch.sh
@@ -57,7 +57,7 @@ fi
 git checkout -b $target_branch
 
 #change version in all pom files
-find . -name 'pom.xml' -type f -exec perl -pi -e 
's#'$OLD_VERSION'#'$NEW_VERSION'#' {} \;
+find . -name 'pom.xml' -type f -exec perl -pi -e 
's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#'
 {} \;
 
 #change version of documentation
 cd docs



[flink] branch master updated: [hotfix][table-api] Add docs to over-windowed table's select()

2019-03-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a2a9c5c  [hotfix][table-api] Add docs to over-windowed table's select()
a2a9c5c is described below

commit a2a9c5c6398c8b3213ff07cd982b252b6c104053
Author: Timo Walther 
AuthorDate: Tue Mar 5 10:04:51 2019 +0100

[hotfix][table-api] Add docs to over-windowed table's select()
---
 .../scala/org/apache/flink/table/api/table.scala | 20 
 1 file changed, 20 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
index 8638650..7c0777c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
@@ -1228,6 +1228,16 @@ class OverWindowedTable(
 private[flink] val table: Table,
 private[flink] val overWindows: Array[OverWindow]) {
 
+  /**
+* Performs a selection operation on an over-windowed table. Similar to an 
SQL SELECT statement.
+* The field expressions can contain complex expressions and aggregations.
+*
+* Example:
+*
+* {{{
+*   overWindowedTable.select('c, 'b.count over 'ow, 'e.sum over 'ow)
+* }}}
+*/
   def select(fields: Expression*): Table = {
 val expandedFields = expandProjectList(
   fields,
@@ -1252,6 +1262,16 @@ class OverWindowedTable(
 )
   }
 
+  /**
+* Performs a selection operation on an over-windowed table. Similar to an 
SQL SELECT statement.
+* The field expressions can contain complex expressions and aggregations.
+*
+* Example:
+*
+* {{{
+*   overWindowedTable.select("c, b.count over ow, e.sum over ow")
+* }}}
+*/
   def select(fields: String): Table = {
 val fieldExprs = ExpressionParser.parseExpressionList(fields)
 //get the correct expression for AggFunctionCall



[flink] branch master updated: [hotfix][table-api] Remove deprecated table function code in insertInto

2019-03-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9297f61  [hotfix][table-api] Remove deprecated table function code in 
insertInto
9297f61 is described below

commit 9297f611c9df69e7b99652a218a1404e28d2afc7
Author: Timo Walther 
AuthorDate: Tue Mar 5 09:41:23 2019 +0100

[hotfix][table-api] Remove deprecated table function code in insertInto
---
 .../main/scala/org/apache/flink/table/api/table.scala| 16 ++--
 1 file changed, 2 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
index 44dac79..8638650 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
@@ -1025,13 +1025,7 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-this.logicalPlan match {
-  case _: LogicalTableFunctionCall =>
-throw new ValidationException("Table functions can only be used in 
table.joinLateral() " +
-  "and table.leftOuterJoinLateral().")
-  case _ =>
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
-}
+insertInto(tableName, tableEnv.queryConfig)
   }
 
   /**
@@ -1047,13 +1041,7 @@ class Table(
 * @param conf The [[QueryConfig]] to use.
 */
   def insertInto(tableName: String, conf: QueryConfig): Unit = {
-this.logicalPlan match {
-  case _: LogicalTableFunctionCall =>
-throw new ValidationException(
-  "Table functions can only be used for joinLateral() and 
leftOuterJoinLateral().")
-  case _ =>
-tableEnv.insertInto(this, tableName, conf)
-}
+tableEnv.insertInto(this, tableName, conf)
   }
 
   /**