[flink] branch master updated: [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert between internal data format and java format
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e2579e3 [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert between internal data format and java format e2579e3 is described below commit e2579e39602ab7d3e906a185353dd413aca58317 Author: JingsongLi AuthorDate: Tue Mar 5 16:36:10 2019 +0800 [FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert between internal data format and java format This closes #7904 --- .../apache/flink/table/dataformat/BinaryArray.java | 43 + .../table/dataformat/DataFormatConverters.java | 979 + .../table/runtime/functions/DateTimeUtils.java | 96 ++ .../table/dataformat/DataFormatConvertersTest.java | 180 4 files changed, 1298 insertions(+) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java index 57ce04b..de9d6ee 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java @@ -20,6 +20,8 @@ package org.apache.flink.table.dataformat; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.type.InternalTypes; import org.apache.flink.table.util.SegmentsUtil; import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; @@ -40,6 +42,7 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class); private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class); + private static final int CHAR_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(char[].class); private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class); @@ -49,6 +52,34 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { return 4 + ((numFields + 31) / 32) * 4; } + /** +* It store real value when type is primitive. +* It store the length and offset of variable-length part when type is string, map, etc. +*/ + public static int calculateFixLengthPartSize(InternalType type) { + if (type.equals(InternalTypes.BOOLEAN)) { + return 1; + } else if (type.equals(InternalTypes.BYTE)) { + return 1; + } else if (type.equals(InternalTypes.SHORT)) { + return 2; + } else if (type.equals(InternalTypes.INT)) { + return 4; + } else if (type.equals(InternalTypes.FLOAT)) { + return 4; + } else if (type.equals(InternalTypes.CHAR)) { + return 2; + } else if (type.equals(InternalTypes.DATE)) { + return 4; + } else if (type.equals(InternalTypes.TIME)) { + return 4; + } else { + // long, double is 8 bytes. + // It store the length and offset of variable-length part when type is string, map, etc. + return 8; + } + } + // The number of elements in this array private int numElements; @@ -315,6 +346,14 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { return values; } + public char[] toCharArray() { + checkNoNull(); + char[] values = new char[numElements]; + SegmentsUtil.copyToUnsafe( + segments, elementOffset, values, CHAR_ARRAY_OFFSET, numElements * 2); + return values; + } + public int[] toIntArray() { checkNoNull(); int[] values = new int[numElements]; @@ -393,6 +432,10 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2); } + public static BinaryArray fromPrimitiveArray(char[] arr) { + return
[flink] branch master updated: [FLINK-11826] Ignore flaky testRateLimitedConsumer
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 56afa2e [FLINK-11826] Ignore flaky testRateLimitedConsumer 56afa2e is described below commit 56afa2e32a841922417e4ed02282d96c430875c2 Author: Thomas Weise AuthorDate: Tue Mar 5 12:37:09 2019 -0800 [FLINK-11826] Ignore flaky testRateLimitedConsumer --- .../java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index cd73872..5a4e775 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -166,6 +167,7 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { * a desired rate of 3 bytes / second. Based on the execution time, the test asserts that this rate was not surpassed. * If no rate limiter is set on the consumer, the test should fail. */ + @Ignore @Test(timeout = 6) public void testRateLimitedConsumer() throws Exception { final String testTopic = "testRateLimitedConsumer";
[flink] 03/05: [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit 80efb71c3b4f8075aeb1a9e4f2b15234af016394 Author: Till Rohrmann AuthorDate: Mon Mar 4 10:43:00 2019 +0100 [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest --- .../leaderelection/ZooKeeperLeaderRetrievalTest.java | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 0282a4f..42c1eca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -18,9 +18,6 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.test.TestingServer; - import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -35,12 +32,12 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.TestingServer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; - import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -50,8 +47,13 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertEquals; +/** + * Tests for the ZooKeeper based leader election and retrieval. + */ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ private TestingServer testingServer; @@ -79,7 +81,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ @After public void after() throws Exception { - if(testingServer != null) { + if (testingServer != null) { testingServer.stop(); testingServer = null;
[flink] 02/05: [hotfix] Fix checkstyle violations in ZooKeeperUtils
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit 622766f71157f7e7faa8600e00d0c63139c5343d Author: Till Rohrmann AuthorDate: Fri Mar 1 15:26:09 2019 +0100 [hotfix] Fix checkstyle violations in ZooKeeperUtils --- .../src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java| 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 86b7e6b..ba253c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -169,8 +169,7 @@ public class ZooKeeperUtils { */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( final CuratorFramework client, - final Configuration configuration) throws Exception - { + final Configuration configuration) throws Exception { return createLeaderRetrievalService(client, configuration, ""); }
[flink] 05/05: [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit f53594d8aad42d1de44fa8a33acc043cbd4c1d57 Author: Till Rohrmann AuthorDate: Fri Mar 1 16:26:48 2019 +0100 [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData When calling ZooKeeperHaServices#closeAndCleanupAllData we should delete the HA_CLUSTER_ID znode which is owned by the respective ZooKeeperHaServices. Moreover, the method tries to go up the chain of parent znodes and tries to delete all empty parent nodes. This should clean up otherwisely orphaned parent znodes. --- .../zookeeper/ZooKeeperHaServices.java | 62 + .../zookeeper/ZooKeeperHaServicesTest.java | 250 + .../ZooKeeperLeaderRetrievalTest.java | 12 +- 3 files changed, 318 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index 8b33c2c..1b2ff44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -34,9 +34,13 @@ import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.concurrent.Executor; @@ -224,6 +228,12 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { exception = t; } + try { + cleanupZooKeeperPaths(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + internalClose(); if (exception != null) { @@ -232,6 +242,58 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { } /** +* Cleans up leftover ZooKeeper paths. +*/ + private void cleanupZooKeeperPaths() throws Exception { + deleteOwnedZNode(); + tryDeleteEmptyParentZNodes(); + } + + private void deleteOwnedZNode() throws Exception { + // delete the HA_CLUSTER_ID znode which is owned by this cluster + client.delete().deletingChildrenIfNeeded().forPath("/"); + } + + /** +* Tries to delete empty parent znodes. +* +* IMPORTANT: This method can be removed once all supported ZooKeeper versions +* support the container {@link org.apache.zookeeper.CreateMode}. +* +* @throws Exception if the deletion fails for other reason than {@link KeeperException.NotEmptyException} +*/ + private void tryDeleteEmptyParentZNodes() throws Exception { + // try to delete the parent znodes if they are empty + String remainingPath = getParentPath(getNormalizedPath(client.getNamespace())); + final CuratorFramework nonNamespaceClient = client.usingNamespace(null); + + while (!isRootPath(remainingPath)) { + try { + nonNamespaceClient.delete().forPath(remainingPath); + } catch (KeeperException.NotEmptyException ignored) { + // We can only delete empty znodes + break; + } + + remainingPath = getParentPath(remainingPath); + } + } + + private static boolean isRootPath(String remainingPath) { + return ZKPaths.PATH_SEPARATOR.equals(remainingPath); + } + + @Nonnull + private static String getNormalizedPath(String path) { + return ZKPaths.makePath(path, ""); + } + + @Nonnull + private static String getParentPath(String path) { + return ZKPaths.getPathAndNode(path).getPath(); + } + + /** * Closes components which don't distinguish between close and closeAndCleanupAllData. */ private void internalClose() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java new file mode 100644 index 000..0edfdb9 --- /dev/null +++
[flink] branch release-1.8 updated (ce2d65d -> f53594d)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git. from ce2d65d [FLINK-11796] [State Backends] Remove Snapshotable interface new d7386f1 [hotfix] Fix checkstyle violations in ZooKeeperHaServices new 622766f [hotfix] Fix checkstyle violations in ZooKeeperUtils new 80efb71 [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest new 34d634e [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress new f53594d [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../zookeeper/ZooKeeperHaServices.java | 91 ++-- .../apache/flink/runtime/util/ZooKeeperUtils.java | 3 +- .../zookeeper/ZooKeeperHaServicesTest.java | 250 + .../ZooKeeperLeaderRetrievalTest.java | 26 ++- 4 files changed, 341 insertions(+), 29 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
[flink] 01/05: [hotfix] Fix checkstyle violations in ZooKeeperHaServices
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit d7386f1697c17bf9c03bb162aed9fd5fbca8b8d6 Author: Till Rohrmann AuthorDate: Fri Mar 1 12:55:40 2019 +0100 [hotfix] Fix checkstyle violations in ZooKeeperHaServices --- .../zookeeper/ZooKeeperHaServices.java | 29 +++--- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index 114d281..8b33c2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -45,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper. * The services store data in ZooKeeper's nodes as illustrated by the following tree structure: - * + * * * /flink * +/cluster_id_1/resource_manager_lock @@ -56,7 +56,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * || /latest-2 * || * |+/job-id-2/job_manager_lock - * | + * | * +/cluster_id_2/resource_manager_lock * | * +/job-id-1/job_manager_lock @@ -64,18 +64,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull; *|/latest-1 *|/persisted_job_graph * - * + * * The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to * accommodate specific permission. - * - * The "cluster_id" part identifies the data stored for a specific Flink "cluster". + * + * The "cluster_id" part identifies the data stored for a specific Flink "cluster". * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job * on a framework like YARN or Mesos (in a "per-job-cluster" mode). - * + * * In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured * automatically by the client or dispatcher that submits the Job to YARN or Mesos. - * + * * In the case of a standalone cluster, that cluster-id needs to be configured via * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same * cluster and participate in the execution of the same set of jobs. @@ -93,21 +93,20 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; // - - - /** The ZooKeeper client to use */ + + /** The ZooKeeper client to use. */ private final CuratorFramework client; - /** The executor to run ZooKeeper callbacks on */ + /** The executor to run ZooKeeper callbacks on. */ private final Executor executor; - /** The runtime configuration */ + /** The runtime configuration. */ private final Configuration configuration; - /** The zookeeper based running jobs registry */ + /** The zookeeper based running jobs registry. */ private final RunningJobsRegistry runningJobsRegistry; - /** Store for arbitrary blobs */ + /** Store for arbitrary blobs. */ private final BlobStoreService blobStoreService; public ZooKeeperHaServices( @@ -233,7 +232,7 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { } /** -* Closes components which don't distinguish between close and closeAndCleanupAllData +* Closes components which don't distinguish between close and closeAndCleanupAllData. */ private void internalClose() { client.close();
[flink] 04/05: [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit 34d634e961ff1cc249deff0c9123e4fcda67d30f Author: Till Rohrmann AuthorDate: Mon Mar 4 10:50:54 2019 +0100 [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress Decrease the timeout from 10s to 1s in testTimeoutOfFindConnectingAddress to speed up the test. --- .../flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 42c1eca..4a1cf80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -197,7 +197,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ */ @Test public void testTimeoutOfFindConnectingAddress() throws Exception { - FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); + FiniteDuration timeout = new FiniteDuration(1L, TimeUnit.SECONDS); LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
[flink] 04/05: [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3f9e3a3f8ba9ace28e961484534217e6fc8a5309 Author: Till Rohrmann AuthorDate: Mon Mar 4 10:50:54 2019 +0100 [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress Decrease the timeout from 10s to 1s in testTimeoutOfFindConnectingAddress to speed up the test. --- .../flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 42c1eca..4a1cf80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -197,7 +197,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ */ @Test public void testTimeoutOfFindConnectingAddress() throws Exception { - FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); + FiniteDuration timeout = new FiniteDuration(1L, TimeUnit.SECONDS); LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
[flink] branch master updated (e8daa49 -> 11a8234)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e8daa49 [FLINK-11796] [State Backends] Remove Snapshotable interface new f60b878 [hotfix] Fix checkstyle violations in ZooKeeperHaServices new f9e352b [hotfix] Fix checkstyle violations in ZooKeeperUtils new c4786e8 [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest new 3f9e3a3 [hotfix][tests] Speed up ZooKeeperLeaderRetrievalTest#testTimeoutOfFindConnectingAddress new 11a8234 [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../zookeeper/ZooKeeperHaServices.java | 91 ++-- .../apache/flink/runtime/util/ZooKeeperUtils.java | 3 +- .../zookeeper/ZooKeeperHaServicesTest.java | 250 + .../ZooKeeperLeaderRetrievalTest.java | 26 ++- 4 files changed, 341 insertions(+), 29 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
[flink] 01/05: [hotfix] Fix checkstyle violations in ZooKeeperHaServices
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f60b8787156f42f07e54f8facf14c8c28982d22e Author: Till Rohrmann AuthorDate: Fri Mar 1 12:55:40 2019 +0100 [hotfix] Fix checkstyle violations in ZooKeeperHaServices --- .../zookeeper/ZooKeeperHaServices.java | 29 +++--- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index 114d281..8b33c2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -45,7 +45,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper. * The services store data in ZooKeeper's nodes as illustrated by the following tree structure: - * + * * * /flink * +/cluster_id_1/resource_manager_lock @@ -56,7 +56,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * || /latest-2 * || * |+/job-id-2/job_manager_lock - * | + * | * +/cluster_id_2/resource_manager_lock * | * +/job-id-1/job_manager_lock @@ -64,18 +64,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull; *|/latest-1 *|/persisted_job_graph * - * + * * The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to * accommodate specific permission. - * - * The "cluster_id" part identifies the data stored for a specific Flink "cluster". + * + * The "cluster_id" part identifies the data stored for a specific Flink "cluster". * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job * on a framework like YARN or Mesos (in a "per-job-cluster" mode). - * + * * In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured * automatically by the client or dispatcher that submits the Job to YARN or Mesos. - * + * * In the case of a standalone cluster, that cluster-id needs to be configured via * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same * cluster and participate in the execution of the same set of jobs. @@ -93,21 +93,20 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; // - - - /** The ZooKeeper client to use */ + + /** The ZooKeeper client to use. */ private final CuratorFramework client; - /** The executor to run ZooKeeper callbacks on */ + /** The executor to run ZooKeeper callbacks on. */ private final Executor executor; - /** The runtime configuration */ + /** The runtime configuration. */ private final Configuration configuration; - /** The zookeeper based running jobs registry */ + /** The zookeeper based running jobs registry. */ private final RunningJobsRegistry runningJobsRegistry; - /** Store for arbitrary blobs */ + /** Store for arbitrary blobs. */ private final BlobStoreService blobStoreService; public ZooKeeperHaServices( @@ -233,7 +232,7 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { } /** -* Closes components which don't distinguish between close and closeAndCleanupAllData +* Closes components which don't distinguish between close and closeAndCleanupAllData. */ private void internalClose() { client.close();
[flink] 05/05: [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 11a8234b50586d476371c64f9d751e8e65fdad0a Author: Till Rohrmann AuthorDate: Fri Mar 1 16:26:48 2019 +0100 [FLINK-11336][zk] Delete ZNodes when ZooKeeperHaServices#closeAndCleanupAllData When calling ZooKeeperHaServices#closeAndCleanupAllData we should delete the HA_CLUSTER_ID znode which is owned by the respective ZooKeeperHaServices. Moreover, the method tries to go up the chain of parent znodes and tries to delete all empty parent nodes. This should clean up otherwisely orphaned parent znodes. This closes #7880. --- .../zookeeper/ZooKeeperHaServices.java | 62 + .../zookeeper/ZooKeeperHaServicesTest.java | 250 + .../ZooKeeperLeaderRetrievalTest.java | 12 +- 3 files changed, 318 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index 8b33c2c..1b2ff44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -34,9 +34,13 @@ import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.concurrent.Executor; @@ -224,6 +228,12 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { exception = t; } + try { + cleanupZooKeeperPaths(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + internalClose(); if (exception != null) { @@ -232,6 +242,58 @@ public class ZooKeeperHaServices implements HighAvailabilityServices { } /** +* Cleans up leftover ZooKeeper paths. +*/ + private void cleanupZooKeeperPaths() throws Exception { + deleteOwnedZNode(); + tryDeleteEmptyParentZNodes(); + } + + private void deleteOwnedZNode() throws Exception { + // delete the HA_CLUSTER_ID znode which is owned by this cluster + client.delete().deletingChildrenIfNeeded().forPath("/"); + } + + /** +* Tries to delete empty parent znodes. +* +* IMPORTANT: This method can be removed once all supported ZooKeeper versions +* support the container {@link org.apache.zookeeper.CreateMode}. +* +* @throws Exception if the deletion fails for other reason than {@link KeeperException.NotEmptyException} +*/ + private void tryDeleteEmptyParentZNodes() throws Exception { + // try to delete the parent znodes if they are empty + String remainingPath = getParentPath(getNormalizedPath(client.getNamespace())); + final CuratorFramework nonNamespaceClient = client.usingNamespace(null); + + while (!isRootPath(remainingPath)) { + try { + nonNamespaceClient.delete().forPath(remainingPath); + } catch (KeeperException.NotEmptyException ignored) { + // We can only delete empty znodes + break; + } + + remainingPath = getParentPath(remainingPath); + } + } + + private static boolean isRootPath(String remainingPath) { + return ZKPaths.PATH_SEPARATOR.equals(remainingPath); + } + + @Nonnull + private static String getNormalizedPath(String path) { + return ZKPaths.makePath(path, ""); + } + + @Nonnull + private static String getParentPath(String path) { + return ZKPaths.getPathAndNode(path).getPath(); + } + + /** * Closes components which don't distinguish between close and closeAndCleanupAllData. */ private void internalClose() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java new file mode 100644 index 000..0edfdb9 --- /dev/null +++
[flink] 03/05: [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c4786e8519c6394ffccadd661c8462108c70cdc6 Author: Till Rohrmann AuthorDate: Mon Mar 4 10:43:00 2019 +0100 [hotfix] Fix checkstyle violations in ZooKeeperLeaderRetrievalTest --- .../leaderelection/ZooKeeperLeaderRetrievalTest.java | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 0282a4f..42c1eca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -18,9 +18,6 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.test.TestingServer; - import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -35,12 +32,12 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.TestingServer; import org.junit.After; import org.junit.Before; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; - import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -50,8 +47,13 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertEquals; +/** + * Tests for the ZooKeeper based leader election and retrieval. + */ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ private TestingServer testingServer; @@ -79,7 +81,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ @After public void after() throws Exception { - if(testingServer != null) { + if (testingServer != null) { testingServer.stop(); testingServer = null;
[flink] 02/05: [hotfix] Fix checkstyle violations in ZooKeeperUtils
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f9e352b413d30972d01ccfdd7d70e7cf3843803c Author: Till Rohrmann AuthorDate: Fri Mar 1 15:26:09 2019 +0100 [hotfix] Fix checkstyle violations in ZooKeeperUtils --- .../src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java| 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 86b7e6b..ba253c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -169,8 +169,7 @@ public class ZooKeeperUtils { */ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService( final CuratorFramework client, - final Configuration configuration) throws Exception - { + final Configuration configuration) throws Exception { return createLeaderRetrievalService(client, configuration, ""); }
[flink] 01/02: [FLINK-11731] [State Backends] Make DefaultOperatorStateBackend follow the builder pattern
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit 2f62b890624116cbe205cdcc9762e41f6a80c84f Author: Yu Li AuthorDate: Sat Mar 2 11:19:21 2019 +0100 [FLINK-11731] [State Backends] Make DefaultOperatorStateBackend follow the builder pattern This closes #7899. (cherry picked from commit 94f84a5c7b4876dee5b34e6a3725da7ee06a607f) --- .../flink/streaming/tests/StubStateBackend.java| 9 +- .../flink/runtime/state/AbstractStateBackend.java | 6 +- .../runtime/state/DefaultOperatorStateBackend.java | 486 + .../state/DefaultOperatorStateBackendBuilder.java | 98 + ...efaultOperatorStateBackendSnapshotStrategy.java | 213 + .../state/OperatorStateRestoreOperation.java | 219 ++ .../runtime/state/PartitionableListState.java | 135 ++ .../apache/flink/runtime/state/StateBackend.java | 8 +- .../runtime/state/filesystem/FsStateBackend.java | 13 +- .../state/heap/HeapKeyedStateBackendBuilder.java | 1 + .../runtime/state/memory/MemoryStateBackend.java | 19 +- .../CheckpointSettingsSerializableTest.java| 6 +- .../runtime/state/OperatorStateBackendTest.java| 114 +++-- .../state/StateBackendMigrationTestBase.java | 13 +- .../runtime/state/ttl/mock/MockStateBackend.java | 7 +- .../streaming/state/RocksDBStateBackend.java | 19 +- .../operators/StreamTaskStateInitializerImpl.java | 21 +- .../operators/BackendRestorerProcedureTest.java| 10 +- .../StreamTaskStateInitializerImplTest.java| 5 +- .../runtime/tasks/StreamTaskTerminationTest.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 97 ++-- .../runtime/tasks/TestSpyWrapperStateBackend.java | 8 +- .../test/streaming/runtime/StateBackendITCase.java | 5 +- 23 files changed, 962 insertions(+), 556 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java index dec4f2d..0d0c895 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -96,7 +97,11 @@ final class StubStateBackend implements StateBackend { } @Override - public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception { - return backend.createOperatorStateBackend(env, operatorIdentifier); + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier, + @Nonnull Collection stateHandles, + CloseableRegistry cancelStreamRegistry) throws Exception { + return backend.createOperatorStateBackend(env, operatorIdentifier, stateHandles, cancelStreamRegistry); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 2343d83..8d94898 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -71,6 +71,8 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri @Override public abstract OperatorStateBackend createOperatorStateBackend( - Environment env, - String operatorIdentifier) throws Exception; + Environment env, + String operatorIdentifier, + @Nonnull Collection stateHandles, + CloseableRegistry cancelStreamRegistry) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index a3541b6..f6a0dba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++
[flink] branch release-1.8 updated (b027fa7 -> ce2d65d)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git. from b027fa7 Fix version change expressions in releasing scripts new 2f62b89 [FLINK-11731] [State Backends] Make DefaultOperatorStateBackend follow the builder pattern new ce2d65d [FLINK-11796] [State Backends] Remove Snapshotable interface The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/streaming/tests/StubStateBackend.java| 9 +- .../runtime/state/AbstractKeyedStateBackend.java | 3 +- .../flink/runtime/state/AbstractStateBackend.java | 6 +- .../runtime/state/DefaultOperatorStateBackend.java | 492 + .../state/DefaultOperatorStateBackendBuilder.java | 98 ...efaultOperatorStateBackendSnapshotStrategy.java | 213 + .../flink/runtime/state/OperatorStateBackend.java | 5 +- .../state/OperatorStateRestoreOperation.java | 219 + .../runtime/state/PartitionableListState.java | 135 ++ .../apache/flink/runtime/state/Snapshotable.java | 41 -- .../apache/flink/runtime/state/StateBackend.java | 8 +- .../runtime/state/filesystem/FsStateBackend.java | 13 +- .../runtime/state/heap/HeapKeyedStateBackend.java | 6 - .../state/heap/HeapKeyedStateBackendBuilder.java | 1 + .../runtime/state/memory/MemoryStateBackend.java | 19 +- .../CheckpointSettingsSerializableTest.java| 6 +- .../runtime/state/OperatorStateBackendTest.java| 114 +++-- .../state/StateBackendMigrationTestBase.java | 13 +- .../state/ttl/mock/MockKeyedStateBackend.java | 7 - .../runtime/state/ttl/mock/MockStateBackend.java | 7 +- .../streaming/state/RocksDBKeyedStateBackend.java | 6 - .../streaming/state/RocksDBStateBackend.java | 19 +- .../api/operators/BackendRestorerProcedure.java| 23 +- .../operators/StreamTaskStateInitializerImpl.java | 21 +- .../operators/BackendRestorerProcedureTest.java| 10 +- .../StreamTaskStateInitializerImplTest.java| 11 +- .../runtime/tasks/StreamTaskTerminationTest.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 97 ++-- .../runtime/tasks/TestSpyWrapperStateBackend.java | 8 +- .../test/streaming/runtime/StateBackendITCase.java | 5 +- 30 files changed, 968 insertions(+), 653 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendBuilder.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
[flink] 02/02: [FLINK-11796] [State Backends] Remove Snapshotable interface
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit ce2d65d88d28bcd5cf182395d511a239dacfb89d Author: Yu Li AuthorDate: Sat Mar 2 20:00:43 2019 +0100 [FLINK-11796] [State Backends] Remove Snapshotable interface (cherry picked from commit e8daa49a593edc401cd44761b25b1324b11be4a6) --- .../runtime/state/AbstractKeyedStateBackend.java | 3 +- .../runtime/state/DefaultOperatorStateBackend.java | 8 + .../flink/runtime/state/OperatorStateBackend.java | 5 ++- .../apache/flink/runtime/state/Snapshotable.java | 41 -- .../runtime/state/heap/HeapKeyedStateBackend.java | 6 .../state/ttl/mock/MockKeyedStateBackend.java | 7 .../streaming/state/RocksDBKeyedStateBackend.java | 6 .../api/operators/BackendRestorerProcedure.java| 23 ++-- .../StreamTaskStateInitializerImplTest.java| 6 9 files changed, 7 insertions(+), 98 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 98e21ff..e28aeef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -52,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public abstract class AbstractKeyedStateBackend implements KeyedStateBackend, - Snapshotable, Collection>, + SnapshotStrategy>, Closeable, CheckpointListener { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index f6a0dba..48c8eb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -39,7 +39,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; -import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.RunnableFuture; @@ -246,13 +245,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } // --- - // Snapshot and restore + // Snapshot // --- - - public void restore(Collection restoreSnapshots) throws Exception { - // all restore work done in builder and nothing to do here - } - @Nonnull @Override public RunnableFuture> snapshot( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java index 3cbb351..4fb8024 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java @@ -22,16 +22,15 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.util.Disposable; import java.io.Closeable; -import java.util.Collection; /** * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface - * {@link Snapshotable} + * {@link SnapshotStrategy} * */ public interface OperatorStateBackend extends OperatorStateStore, - Snapshotable, Collection>, + SnapshotStrategy>, Closeable, Disposable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java deleted file mode 100644 index 1677855..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - *
[flink] branch master updated (fb256d4 -> e8daa49)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fb256d4 [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory for Blink (#7898) new 94f84a5 [FLINK-11731] [State Backends] Make DefaultOperatorStateBackend follow the builder pattern new e8daa49 [FLINK-11796] [State Backends] Remove Snapshotable interface The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/streaming/tests/StubStateBackend.java| 9 +- .../runtime/state/AbstractKeyedStateBackend.java | 3 +- .../flink/runtime/state/AbstractStateBackend.java | 6 +- .../runtime/state/DefaultOperatorStateBackend.java | 492 + .../state/DefaultOperatorStateBackendBuilder.java | 98 ...efaultOperatorStateBackendSnapshotStrategy.java | 213 + .../flink/runtime/state/OperatorStateBackend.java | 5 +- .../state/OperatorStateRestoreOperation.java | 219 + .../runtime/state/PartitionableListState.java | 135 ++ .../apache/flink/runtime/state/Snapshotable.java | 41 -- .../apache/flink/runtime/state/StateBackend.java | 8 +- .../runtime/state/filesystem/FsStateBackend.java | 13 +- .../runtime/state/heap/HeapKeyedStateBackend.java | 6 - .../state/heap/HeapKeyedStateBackendBuilder.java | 1 + .../runtime/state/memory/MemoryStateBackend.java | 19 +- .../CheckpointSettingsSerializableTest.java| 6 +- .../runtime/state/OperatorStateBackendTest.java| 114 +++-- .../state/StateBackendMigrationTestBase.java | 13 +- .../state/ttl/mock/MockKeyedStateBackend.java | 7 - .../runtime/state/ttl/mock/MockStateBackend.java | 7 +- .../streaming/state/RocksDBKeyedStateBackend.java | 6 - .../streaming/state/RocksDBStateBackend.java | 19 +- .../api/operators/BackendRestorerProcedure.java| 23 +- .../operators/StreamTaskStateInitializerImpl.java | 21 +- .../operators/BackendRestorerProcedureTest.java| 10 +- .../StreamTaskStateInitializerImplTest.java| 11 +- .../runtime/tasks/StreamTaskTerminationTest.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 97 ++-- .../runtime/tasks/TestSpyWrapperStateBackend.java | 8 +- .../test/streaming/runtime/StateBackendITCase.java | 5 +- 30 files changed, 968 insertions(+), 653 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendBuilder.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
[flink] 02/02: [FLINK-11796] [State Backends] Remove Snapshotable interface
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e8daa49a593edc401cd44761b25b1324b11be4a6 Author: Yu Li AuthorDate: Sun Mar 3 03:00:43 2019 +0800 [FLINK-11796] [State Backends] Remove Snapshotable interface --- .../runtime/state/AbstractKeyedStateBackend.java | 3 +- .../runtime/state/DefaultOperatorStateBackend.java | 8 + .../flink/runtime/state/OperatorStateBackend.java | 5 ++- .../apache/flink/runtime/state/Snapshotable.java | 41 -- .../runtime/state/heap/HeapKeyedStateBackend.java | 6 .../state/ttl/mock/MockKeyedStateBackend.java | 7 .../streaming/state/RocksDBKeyedStateBackend.java | 6 .../api/operators/BackendRestorerProcedure.java| 23 ++-- .../StreamTaskStateInitializerImplTest.java| 6 9 files changed, 7 insertions(+), 98 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 98e21ff..e28aeef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -36,7 +36,6 @@ import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -52,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public abstract class AbstractKeyedStateBackend implements KeyedStateBackend, - Snapshotable, Collection>, + SnapshotStrategy>, Closeable, CheckpointListener { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index f6a0dba..48c8eb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -39,7 +39,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; -import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.RunnableFuture; @@ -246,13 +245,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } // --- - // Snapshot and restore + // Snapshot // --- - - public void restore(Collection restoreSnapshots) throws Exception { - // all restore work done in builder and nothing to do here - } - @Nonnull @Override public RunnableFuture> snapshot( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java index 3cbb351..4fb8024 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java @@ -22,16 +22,15 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.util.Disposable; import java.io.Closeable; -import java.util.Collection; /** * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface - * {@link Snapshotable} + * {@link SnapshotStrategy} * */ public interface OperatorStateBackend extends OperatorStateStore, - Snapshotable, Collection>, + SnapshotStrategy>, Closeable, Disposable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java deleted file mode 100644 index 1677855..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable
[flink] branch master updated: [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory for Blink (#7898)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new fb256d4 [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory for Blink (#7898) fb256d4 is described below commit fb256d48e52c490b6a899c17c1225d30f7a6c92f Author: Jingsong Lee AuthorDate: Tue Mar 5 21:17:15 2019 +0800 [FLINK-11803][table-planner-blink] Improve FlinkTypeFactory for Blink (#7898) --- .../flink/table/calcite/FlinkTypeFactory.scala | 167 - .../flink/table/calcite/FlinkTypeSystem.scala | 51 +++ .../flink/table/plan/schema/ArrayRelDataType.scala | 55 +++ .../flink/table/plan/schema/InlineTable.scala | 18 +-- .../flink/table/plan/schema/MapRelDataType.scala | 50 ++ .../flink/table/plan/schema/RowRelDataType.scala | 89 +++ .../apache/flink/table/plan/schema/RowSchema.scala | 72 + .../flink/table/calcite/FlinkTypeFactoryTest.scala | 75 + 8 files changed, 497 insertions(+), 80 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index ba52152..0ba2252 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -18,28 +18,22 @@ package org.apache.flink.table.calcite -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo._ -import org.apache.flink.api.java.typeutils.ValueTypeInfo._ -import org.apache.flink.table.`type`.DecimalType +import org.apache.flink.table.`type`.{ArrayType, DecimalType, InternalType, InternalTypes, MapType, RowType} import org.apache.flink.table.api.TableException -import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName -import org.apache.flink.table.typeutils._ +import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, RowRelDataType, RowSchema} -import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`._ -import org.apache.calcite.sql.SqlIntervalQualifier import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.parser.SqlParserPos import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable /** - * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] + * Flink specific type factory that represents the interface between Flink's [[InternalType]] * and Calcite's [[RelDataType]]. */ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { @@ -47,39 +41,51 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp // NOTE: for future data types it might be necessary to // override more methods of RelDataTypeFactoryImpl - def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match { -case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => false -case _: BasicTypeInfo[_] => false -case _: SqlTimeTypeInfo[_] => false -case _: TimeIntervalTypeInfo[_] => false -case _ => true - } + private val seenTypes = mutable.HashMap[(InternalType, Boolean), RelDataType]() - def createTypeFromTypeInfo( - typeInfo: TypeInformation[_], + def createTypeFromInternalType( + tp: InternalType, isNullable: Boolean): RelDataType = { -// we cannot use seenTypes for simple types, -// because time indicators and timestamps would be the same - -val relType = if (!isAdvanced(typeInfo)) { - // simple types can be converted to SQL types and vice versa - val sqlType = typeInfoToSqlTypeName(typeInfo) - sqlType match { - -case INTERVAL_YEAR_MONTH => - createSqlIntervalType( -new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)) - -case INTERVAL_DAY_SECOND => - createSqlIntervalType( -new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) - -case _ => - createSqlType(sqlType) - } -} else { - throw new TableException("Unsupported now") +val relType = seenTypes.get((tp, isNullable)) match { + case Some(retType: RelDataType) => retType + case None => +val refType = tp match { + case InternalTypes.BOOLEAN => createSqlType(BOOLEAN) + case InternalTypes.BYTE => createSqlType(TINYINT) + case InternalTypes.SHORT =>
[flink] branch release-1.7 updated (e3c0e69 -> 70bc26c)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git. from e3c0e69 [FLINK-9003][E2E] Add operators with input type that goes through custom, stateful serialization new d099898 [hotfix][core] Fix Tuple0Serializer handling of null new 13b3e6b [hotfix][core] Comparing whole collections rather than contents in KryoGenericTypeSerializerTest new c3b49ee [hotfix][tests] Call all methods from SerializerTestBase in SerializerTestInstance by reflection new 54cf610 [FLINK-11420][core] Fixed duplicate method of TraversableSerializer new f4c0991 [hotfix][tests] Added matcher that performs deep comparison with taking Tuples into account new 35778da [hotfix][core] Added snapshot for NothingSerializerSnapshot new 70bc26c [FLINK-11823][core] Fixed duplicate method of TrySerializer The 15333 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/typeutils/runtime/Tuple0Serializer.java | 2 + .../common/typeutils/CompositeSerializerTest.java | 6 - .../api/common/typeutils/SerializerTestBase.java | 82 -- .../common/typeutils/SerializerTestInstance.java | 55 +-- .../java/typeutils/runtime/RowSerializerTest.java | 14 +- .../typeutils/runtime/TupleSerializerTest.java | 3 +- .../runtime/TupleSerializerTestInstance.java | 79 -- .../flink/testutils/CustomEqualityMatcher.java | 70 + .../flink/testutils/DeeplyEqualsChecker.java | 175 + .../valuearray/ByteValueArraySerializerTest.java | 3 +- .../valuearray/CharValueArraySerializerTest.java | 3 +- .../valuearray/DoubleValueArraySerializerTest.java | 3 +- .../valuearray/FloatValueArraySerializerTest.java | 3 +- .../valuearray/IntValueArraySerializerTest.java| 3 +- .../valuearray/LongValueArraySerializerTest.java | 3 +- .../valuearray/NullValueArraySerializerTest.java | 3 +- .../valuearray/ShortValueArraySerializerTest.java | 3 +- .../valuearray/StringValueArraySerializerTest.java | 3 +- .../valuearray/ValueArraySerializerTestBase.java | 38 ++--- .../api/scala/typeutils/NothingSerializer.scala| 10 +- .../scala/typeutils/TraversableSerializer.scala| 2 +- .../flink/api/scala/typeutils/TrySerializer.scala | 17 +- .../runtime/KryoGenericTypeSerializerTest.scala| 8 +- .../runtime/ScalaSpecialTypesSerializerTest.scala | 83 +++--- .../scala/runtime/TraversableSerializerTest.scala | 17 -- .../runtime/TupleSerializerTestInstance.scala | 81 +- 26 files changed, 471 insertions(+), 298 deletions(-) delete mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTestInstance.java create mode 100644 flink-core/src/test/java/org/apache/flink/testutils/CustomEqualityMatcher.java create mode 100644 flink-core/src/test/java/org/apache/flink/testutils/DeeplyEqualsChecker.java copy flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringSerializerTest.java => flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArraySerializerTestBase.java (56%)
[flink] branch release-1.8 updated: Fix version change expressions in releasing scripts
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new b027fa7 Fix version change expressions in releasing scripts b027fa7 is described below commit b027fa74fd677e59d0cffe6c74bc4915521a1c61 Author: Aljoscha Krettek AuthorDate: Tue Mar 5 11:42:44 2019 +0100 Fix version change expressions in releasing scripts The earlier version had "\1${NEW_VERSION}" in there, which would resolve to "\11.8.", i.e. the backreference would now be \11. --- tools/releasing/create_release_branch.sh | 2 +- tools/releasing/update_branch_version.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/releasing/create_release_branch.sh b/tools/releasing/create_release_branch.sh index 9673e62..81619e7 100755 --- a/tools/releasing/create_release_branch.sh +++ b/tools/releasing/create_release_branch.sh @@ -57,7 +57,7 @@ fi git checkout -b $target_branch #change version in all pom files -find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#' {} \; +find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#' {} \; #change version of documentation cd docs diff --git a/tools/releasing/update_branch_version.sh b/tools/releasing/update_branch_version.sh index 10290d9..6f00093 100755 --- a/tools/releasing/update_branch_version.sh +++ b/tools/releasing/update_branch_version.sh @@ -49,7 +49,7 @@ fi cd .. #change version in all pom files -find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#' {} \; +find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#' {} \; #change version of documentation cd docs
[flink] branch master updated: Fix version change expressions in releasing scripts
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 60bedd9 Fix version change expressions in releasing scripts 60bedd9 is described below commit 60bedd9dcda3d10b15999b124375ea381a4eee2d Author: Aljoscha Krettek AuthorDate: Tue Mar 5 11:42:44 2019 +0100 Fix version change expressions in releasing scripts The earlier version had "\1${NEW_VERSION}" in there, which would resolve to "\11.8.", i.e. the backreference would now be \11. --- tools/releasing/create_release_branch.sh | 2 +- tools/releasing/update_branch_version.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/releasing/create_release_branch.sh b/tools/releasing/create_release_branch.sh index 9673e62..81619e7 100755 --- a/tools/releasing/create_release_branch.sh +++ b/tools/releasing/create_release_branch.sh @@ -57,7 +57,7 @@ fi git checkout -b $target_branch #change version in all pom files -find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#' {} \; +find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#' {} \; #change version of documentation cd docs diff --git a/tools/releasing/update_branch_version.sh b/tools/releasing/update_branch_version.sh index 10290d9..6f00093 100755 --- a/tools/releasing/update_branch_version.sh +++ b/tools/releasing/update_branch_version.sh @@ -49,7 +49,7 @@ fi cd .. #change version in all pom files -find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#' {} \; +find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#${1}'$NEW_VERSION'${2}#' {} \; #change version of documentation cd docs
[flink] branch release-1.8 updated: Fix create_release_branch.sh to accomodate Hadoop Versions
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 571997b Fix create_release_branch.sh to accomodate Hadoop Versions 571997b is described below commit 571997b2e3e7b078d52f5e866d62a1c43050646c Author: Aljoscha Krettek AuthorDate: Tue Mar 5 11:18:33 2019 +0100 Fix create_release_branch.sh to accomodate Hadoop Versions Before, this would not correctly change version tags that have a version prefix or suffix, like the shaded Hadoop modules. --- tools/releasing/create_release_branch.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/releasing/create_release_branch.sh b/tools/releasing/create_release_branch.sh index 615722b..9673e62 100755 --- a/tools/releasing/create_release_branch.sh +++ b/tools/releasing/create_release_branch.sh @@ -57,7 +57,7 @@ fi git checkout -b $target_branch #change version in all pom files -find . -name 'pom.xml' -type f -exec perl -pi -e 's#'$OLD_VERSION'#'$NEW_VERSION'#' {} \; +find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#' {} \; #change version of documentation cd docs
[flink] branch master updated: Fix create_release_branch.sh to accomodate Hadoop Versions
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9d6f221 Fix create_release_branch.sh to accomodate Hadoop Versions 9d6f221 is described below commit 9d6f221ba9b514a0929d0a0e429852d71b7d61f8 Author: Aljoscha Krettek AuthorDate: Tue Mar 5 11:18:33 2019 +0100 Fix create_release_branch.sh to accomodate Hadoop Versions Before, this would not correctly change version tags that have a version prefix or suffix, like the shaded Hadoop modules. --- tools/releasing/create_release_branch.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/releasing/create_release_branch.sh b/tools/releasing/create_release_branch.sh index 615722b..9673e62 100755 --- a/tools/releasing/create_release_branch.sh +++ b/tools/releasing/create_release_branch.sh @@ -57,7 +57,7 @@ fi git checkout -b $target_branch #change version in all pom files -find . -name 'pom.xml' -type f -exec perl -pi -e 's#'$OLD_VERSION'#'$NEW_VERSION'#' {} \; +find . -name 'pom.xml' -type f -exec perl -pi -e 's#(.*)'$OLD_VERSION'(.*)#\1'$NEW_VERSION'\2#' {} \; #change version of documentation cd docs
[flink] branch master updated: [hotfix][table-api] Add docs to over-windowed table's select()
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a2a9c5c [hotfix][table-api] Add docs to over-windowed table's select() a2a9c5c is described below commit a2a9c5c6398c8b3213ff07cd982b252b6c104053 Author: Timo Walther AuthorDate: Tue Mar 5 10:04:51 2019 +0100 [hotfix][table-api] Add docs to over-windowed table's select() --- .../scala/org/apache/flink/table/api/table.scala | 20 1 file changed, 20 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala index 8638650..7c0777c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala @@ -1228,6 +1228,16 @@ class OverWindowedTable( private[flink] val table: Table, private[flink] val overWindows: Array[OverWindow]) { + /** +* Performs a selection operation on an over-windowed table. Similar to an SQL SELECT statement. +* The field expressions can contain complex expressions and aggregations. +* +* Example: +* +* {{{ +* overWindowedTable.select('c, 'b.count over 'ow, 'e.sum over 'ow) +* }}} +*/ def select(fields: Expression*): Table = { val expandedFields = expandProjectList( fields, @@ -1252,6 +1262,16 @@ class OverWindowedTable( ) } + /** +* Performs a selection operation on an over-windowed table. Similar to an SQL SELECT statement. +* The field expressions can contain complex expressions and aggregations. +* +* Example: +* +* {{{ +* overWindowedTable.select("c, b.count over ow, e.sum over ow") +* }}} +*/ def select(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) //get the correct expression for AggFunctionCall
[flink] branch master updated: [hotfix][table-api] Remove deprecated table function code in insertInto
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9297f61 [hotfix][table-api] Remove deprecated table function code in insertInto 9297f61 is described below commit 9297f611c9df69e7b99652a218a1404e28d2afc7 Author: Timo Walther AuthorDate: Tue Mar 5 09:41:23 2019 +0100 [hotfix][table-api] Remove deprecated table function code in insertInto --- .../main/scala/org/apache/flink/table/api/table.scala| 16 ++-- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala index 44dac79..8638650 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala @@ -1025,13 +1025,7 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -this.logicalPlan match { - case _: LogicalTableFunctionCall => -throw new ValidationException("Table functions can only be used in table.joinLateral() " + - "and table.leftOuterJoinLateral().") - case _ => -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) -} +insertInto(tableName, tableEnv.queryConfig) } /** @@ -1047,13 +1041,7 @@ class Table( * @param conf The [[QueryConfig]] to use. */ def insertInto(tableName: String, conf: QueryConfig): Unit = { -this.logicalPlan match { - case _: LogicalTableFunctionCall => -throw new ValidationException( - "Table functions can only be used for joinLateral() and leftOuterJoinLateral().") - case _ => -tableEnv.insertInto(this, tableName, conf) -} +tableEnv.insertInto(this, tableName, conf) } /**