Build failed in Jenkins: flink-snapshot-deployment #1004

2018-11-05 Thread Apache Jenkins Server
See 


--
[...truncated 1.73 MB...]
[INFO] Excluding joda-time:joda-time:jar:2.5 from the shaded jar.
[INFO] Excluding com.amazonaws:jmespath-java:jar:1.11.437 from the shaded jar.
[INFO] Including org.apache.flink:force-shading:jar:1.8-SNAPSHOT in the shaded 
jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.objenesis:objenesis:jar:2.1 from the shaded jar.
[INFO] Excluding org.scala-lang:scala-library:jar:2.11.12 from the shaded jar.
[INFO] Excluding com.typesafe.akka:akka-actor_2.11:jar:2.4.20 from the shaded 
jar.
[INFO] Excluding com.typesafe:config:jar:1.3.0 from the shaded jar.
[INFO] Excluding org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0 from 
the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing 

 with 

[INFO] Replacing original test artifact with shaded test artifact.
[INFO] Replacing 

 with 

[INFO] Dependency-reduced POM written at: 

[INFO] 
[INFO] --- maven-shade-plugin:3.0.0:shade (S3UtilProgram) @ 
flink-e2e-test-utils ---
[INFO] Including org.apache.flink:flink-streaming-java_2.11:jar:1.8-SNAPSHOT in 
the shaded jar.
[INFO] Including org.apache.flink:flink-runtime_2.11:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including org.apache.flink:flink-core:jar:1.8-SNAPSHOT in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.4.1 in the shaded 
jar.
[INFO] Including org.tukaani:xz:jar:1.5 in the shaded jar.
[INFO] Including org.apache.flink:flink-java:jar:1.8-SNAPSHOT in the shaded jar.
[INFO] Including 
org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including org.apache.flink:flink-hadoop-fs:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including commons-io:commons-io:jar:2.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-netty:jar:4.1.24.Final-5.0 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm:jar:5.0.4-5.0 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-shaded-jackson:jar:2.7.9-5.0 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Including commons-cli:commons-cli:jar:1.3.1 in the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including com.typesafe.akka:akka-stream_2.11:jar:2.4.20 in the shaded 
jar.
[INFO] Including org.reactivestreams:reactive-streams:jar:1.0.0 in the shaded 
jar.
[INFO] Including com.typesafe:ssl-config-core_2.11:jar:0.2.1 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 
in the shaded jar.
[INFO] Including com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20 in the shaded 
jar.
[INFO] Including com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20 in the shaded jar.
[INFO] Including org.clapper:grizzled-slf4j_2.11:jar:1.3.2 in the shaded jar.
[INFO] Including com.github.scopt:scopt_2.11:jar:3.5.0 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including com.twitter:chill_2.11:jar:0.7.6 in the shaded jar.
[INFO] Including com.twitter:chill-java:jar:0.7.6 in the shaded jar.
[INFO] Including org.apache.flink:flink-clients_2.11:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including org.apache.flink:flink-optimizer_2.11:jar:1.8-SNAPSHOT in the 
shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-5.0 in the shaded 
jar.
[INFO] Including org.apache.commons:commons-math3:jar:3.5 in the shaded jar.
[INFO] Including com.amazonaws:aws-java-sdk-s3:jar:1.11.437 in the shaded jar.
[INFO] Including 

Jenkins build is back to normal : flink-snapshot-deployment-1.6 #93

2018-11-05 Thread Apache Jenkins Server
See 




[flink] branch master updated: [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d375c0a  [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
d375c0a is described below

commit d375c0a5824629c701c15ea228ac1ce22e46dc3b
Author: Nico Kruber 
AuthorDate: Tue Oct 23 18:47:02 2018 +0200

[FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
---
 .../typeutils/base/EnumSerializerUpgradeTest.java  | 37 ++
 .../apache/flink/testutils/ClassLoaderUtils.java   | 59 ++
 .../runtime/rpc/messages/RemoteRpcInvocation.java  | 42 ---
 .../runtime/classloading/ClassLoaderTest.java  | 59 +-
 4 files changed, 155 insertions(+), 42 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index e906f62..fb11945 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -23,21 +23,17 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerialization
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 public class EnumSerializerUpgradeTest extends TestLogger {
 
@@ -87,7 +83,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
private static TypeSerializerSchemaCompatibility 
checkCompatibility(String enumSourceA, String enumSourceB)
throws IOException, ClassNotFoundException {
 
-   ClassLoader classLoader = compileAndLoadEnum(
+   ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceA);
 
EnumSerializer enumSerializer = new 
EnumSerializer(classLoader.loadClass(ENUM_NAME));
@@ -103,7 +99,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
snapshotBytes = outBuffer.toByteArray();
}
 
-   ClassLoader classLoader2 = compileAndLoadEnum(
+   ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceB);
 
TypeSerializerSnapshot restoredSnapshot;
@@ -118,29 +114,4 @@ public class EnumSerializerUpgradeTest extends TestLogger {
EnumSerializer enumSerializer2 = new 
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
return 
restoredSnapshot.resolveSchemaCompatibility(enumSerializer2);
}
-
-   private static ClassLoader compileAndLoadEnum(File root, String 
filename, String source) throws IOException {
-   File file = writeSourceFile(root, filename, source);
-
-   compileClass(file);
-
-   return new URLClassLoader(
-   new URL[]{root.toURI().toURL()},
-   Thread.currentThread().getContextClassLoader());
-   }
-
-   private static File writeSourceFile(File root, String filename, String 
source) throws IOException {
-   File file = new File(root, filename);
-   FileWriter fileWriter = new FileWriter(file);
-
-   fileWriter.write(source);
-   fileWriter.close();
-
-   return file;
-   }
-
-   private static int compileClass(File sourceFile) {
-   JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
-   return compiler.run(null, null, null, "-proc:none", 
sourceFile.getPath());
-   }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
new file mode 100644
index 000..0688c1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software 

[flink] branch release-1.6 updated: [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new c0c9cf2  [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
c0c9cf2 is described below

commit c0c9cf28740ae1cc7dcd231e014844b204a964ca
Author: Nico Kruber 
AuthorDate: Tue Oct 23 18:47:02 2018 +0200

[FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
---
 .../typeutils/base/EnumSerializerUpgradeTest.java  | 37 ++
 .../apache/flink/testutils/ClassLoaderUtils.java   | 59 ++
 .../runtime/rpc/messages/RemoteRpcInvocation.java  | 42 ---
 .../runtime/classloading/ClassLoaderTest.java  | 59 +-
 4 files changed, 155 insertions(+), 42 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index 1f67acb..2bcae45 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -23,21 +23,17 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 public class EnumSerializerUpgradeTest extends TestLogger {
 
@@ -87,7 +83,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
private static CompatibilityResult checkCompatibility(String 
enumSourceA, String enumSourceB)
throws IOException, ClassNotFoundException {
 
-   ClassLoader classLoader = compileAndLoadEnum(
+   ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceA);
 
EnumSerializer enumSerializer = new 
EnumSerializer(classLoader.loadClass(ENUM_NAME));
@@ -102,7 +98,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
snapshotBytes = outBuffer.toByteArray();
}
 
-   ClassLoader classLoader2 = compileAndLoadEnum(
+   ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceB);
 
TypeSerializerConfigSnapshot restoredSnapshot;
@@ -116,29 +112,4 @@ public class EnumSerializerUpgradeTest extends TestLogger {
EnumSerializer enumSerializer2 = new 
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
return enumSerializer2.ensureCompatibility(restoredSnapshot);
}
-
-   private static ClassLoader compileAndLoadEnum(File root, String 
filename, String source) throws IOException {
-   File file = writeSourceFile(root, filename, source);
-
-   compileClass(file);
-
-   return new URLClassLoader(
-   new URL[]{root.toURI().toURL()},
-   Thread.currentThread().getContextClassLoader());
-   }
-
-   private static File writeSourceFile(File root, String filename, String 
source) throws IOException {
-   File file = new File(root, filename);
-   FileWriter fileWriter = new FileWriter(file);
-
-   fileWriter.write(source);
-   fileWriter.close();
-
-   return file;
-   }
-
-   private static int compileClass(File sourceFile) {
-   JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
-   return compiler.run(null, null, null, "-proc:none", 
sourceFile.getPath());
-   }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
new file mode 100644
index 000..0688c1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software 

[flink] branch release-1.7 updated: [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 82c5267  [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
82c5267 is described below

commit 82c52670a9d13b629b6af387fe0a6ca9b7bd5966
Author: Nico Kruber 
AuthorDate: Tue Oct 23 18:47:02 2018 +0200

[FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting 
ObjectInputStream's ClassNotFoundException
---
 .../typeutils/base/EnumSerializerUpgradeTest.java  | 37 ++
 .../apache/flink/testutils/ClassLoaderUtils.java   | 59 ++
 .../runtime/rpc/messages/RemoteRpcInvocation.java  | 42 ---
 .../runtime/classloading/ClassLoaderTest.java  | 59 +-
 4 files changed, 155 insertions(+), 42 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index e906f62..fb11945 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -23,21 +23,17 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerialization
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.tools.JavaCompiler;
-import javax.tools.ToolProvider;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 public class EnumSerializerUpgradeTest extends TestLogger {
 
@@ -87,7 +83,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
private static TypeSerializerSchemaCompatibility 
checkCompatibility(String enumSourceA, String enumSourceB)
throws IOException, ClassNotFoundException {
 
-   ClassLoader classLoader = compileAndLoadEnum(
+   ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceA);
 
EnumSerializer enumSerializer = new 
EnumSerializer(classLoader.loadClass(ENUM_NAME));
@@ -103,7 +99,7 @@ public class EnumSerializerUpgradeTest extends TestLogger {
snapshotBytes = outBuffer.toByteArray();
}
 
-   ClassLoader classLoader2 = compileAndLoadEnum(
+   ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(), ENUM_NAME + ".java", 
enumSourceB);
 
TypeSerializerSnapshot restoredSnapshot;
@@ -118,29 +114,4 @@ public class EnumSerializerUpgradeTest extends TestLogger {
EnumSerializer enumSerializer2 = new 
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
return 
restoredSnapshot.resolveSchemaCompatibility(enumSerializer2);
}
-
-   private static ClassLoader compileAndLoadEnum(File root, String 
filename, String source) throws IOException {
-   File file = writeSourceFile(root, filename, source);
-
-   compileClass(file);
-
-   return new URLClassLoader(
-   new URL[]{root.toURI().toURL()},
-   Thread.currentThread().getContextClassLoader());
-   }
-
-   private static File writeSourceFile(File root, String filename, String 
source) throws IOException {
-   File file = new File(root, filename);
-   FileWriter fileWriter = new FileWriter(file);
-
-   fileWriter.write(source);
-   fileWriter.close();
-
-   return file;
-   }
-
-   private static int compileClass(File sourceFile) {
-   JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
-   return compiler.run(null, null, null, "-proc:none", 
sourceFile.getPath());
-   }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
new file mode 100644
index 000..0688c1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache 

svn commit: r30687 - /dev/flink/flink-1.7.0/

2018-11-05 Thread trohrmann
Author: trohrmann
Date: Mon Nov  5 20:40:42 2018
New Revision: 30687

Log:
Add flink-1.7.0-rc1 release files

Added:
dev/flink/flink-1.7.0/
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.12.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.12.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.12.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop26-scala_2.11.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop26-scala_2.11.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop26-scala_2.11.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop26-scala_2.12.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop26-scala_2.12.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop26-scala_2.12.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop27-scala_2.11.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop27-scala_2.11.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop27-scala_2.11.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop27-scala_2.12.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop27-scala_2.12.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop27-scala_2.12.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-scala_2.11.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-scala_2.11.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-scala_2.11.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-bin-scala_2.12.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-bin-scala_2.12.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-bin-scala_2.12.tgz.sha512
dev/flink/flink-1.7.0/flink-1.7.0-src.tgz   (with props)
dev/flink/flink-1.7.0/flink-1.7.0-src.tgz.asc
dev/flink/flink-1.7.0/flink-1.7.0-src.tgz.sha512

Added: dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz
--
svn:mime-type = application/octet-stream

Added: dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.asc
==
--- dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.asc (added)
+++ dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.asc Mon Nov  
5 20:40:42 2018
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEENFF/bqllF4fQbsqJHzAlaals/9UFAlvgYaQACgkQHzAlaals
+/9V1dQ/8CvmgZ1ZALezta2Ui7zJu+lqnYC163rV0yHOtxT11f3K+cTF5Wg7lNVGI
+0PB4udehjAYCxwV8w+nOhFKMrkLBrUakg3hCSUna62ajIq6Zv4bo8APpsg96s1F1
+u6Wr9Q1PKqa7TaQTLMsCww+ndNEBzu3/J7oezfwPQ1/CfPS5DCTkW7k6Z7uc/zIk
+GN0ABdYi38KIoN9CLtLdscWQmAaHLO8YV82zIFPLP33/xS/7Fco8ljjAS1aw9IJJ
+XqKcg6tZFSqKql8r3run3MRp06gLK59yIEqeZKUUTN5+Tots2ntSagGov1fF7WhH
+9pkGRABQRtWY1Jvi6fvPJykODGPuKRpEo7otHdG5CPhFFDlZ2gMtWP2bif7fn8x3
+Jhw7431sMOgxcngBk0JG4GfZQoGhUnW4xZ00Y03b41+pT7f/Kjz6Etweponx1W+y
+/psdF9aWqUC0WYfszqYJiPFa7a0eo+E4DRxWkuHzZw1UfB9yj64LxTzqsgwm6CPO
+I5xrCSJ+PyQh98pb/oINbHWdDNXHvofBiFJ1eBKHj1yKE/UyDlJKBD2hsrNhScm4
+bXP0sc9hcHR2XjNFyBk0zJpYm/bdSsm65Ow74w7dJOjPo7w73RH1uju/1Djlslkc
+cPDDaFInVKGdlrxWo0GUHnLSDlPO4vUm4qq1VmguZaXJpT76o18=
+=aU7Z
+-END PGP SIGNATURE-

Added: dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.sha512
==
--- dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.sha512 (added)
+++ dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.11.tgz.sha512 Mon 
Nov  5 20:40:42 2018
@@ -0,0 +1 @@
+1aa4ecabf17f5d3e59cb9e6e655a210b50f37420b13be1bb1927ee5fcdff2acdbee6a456146462973de36b788f2451d4a76e4d52d78b3859130e7b94522e2af8
  flink-1.7.0-bin-hadoop24-scala_2.11.tgz

Added: dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.12.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-1.7.0/flink-1.7.0-bin-hadoop24-scala_2.12.tgz
--
svn:mime-type = application/octet-stream

Added: 

[flink] branch release-1.5 updated: [FLINK-10773] Harden resume externalized checkpoint end-to-end test

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
 new d1d31d0  [FLINK-10773] Harden resume externalized checkpoint 
end-to-end test
d1d31d0 is described below

commit d1d31d06d28021bfcf66d2b1ef2454ecfbb8a5a2
Author: Till Rohrmann 
AuthorDate: Sun Nov 4 19:32:44 2018 +0100

[FLINK-10773] Harden resume externalized checkpoint end-to-end test

Ignore the 'Artificial Failure' exceptions and rename
ExceptionThrowingFailureMapper into FailureMapper to avoid false
positive exception matchings.
---
 .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java | 6 --
 .../apache/flink/streaming/tests/DataStreamAllroundTestProgram.java | 6 +++---
 .../{ExceptionThrowingFailureMapper.java => FailureMapper.java} | 4 ++--
 flink-end-to-end-tests/test-scripts/common.sh   | 1 +
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 4710100..68b752b 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -311,6 +311,8 @@ class DataStreamAllroundTestJobFactory {

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue( {
 
+   private static final long serialVersionUID = 
-3154419724891779938L;
+
@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
@@ -339,8 +341,8 @@ class DataStreamAllroundTestJobFactory {
return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), 
TEST_SIMULATE_FAILURE.defaultValue());
}
 
-   static MapFunction 
createExceptionThrowingFailureMapper(ParameterTool pt) {
-   return new ExceptionThrowingFailureMapper<>(
+   static MapFunction createFailureMapper(ParameterTool pt) {
+   return new FailureMapper<>(
pt.getLong(
TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),

TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index afbc01a..c7efe42 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -32,7 +32,7 @@ import java.util.Collections;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -58,7 +58,7 @@ public class DataStreamAllroundTestProgram {
private static final String KEYED_STATE_OPER_NAME = 
"ArtificalKeyedStateMapper";
private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
-   private static final String FAILURE_MAPPER_NAME = 
"ExceptionThrowingFailureMapper";
+   private static final String FAILURE_MAPPER_NAME = "FailureMapper";
 
public static void main(String[] args) throws Exception {
 

[flink] branch release-1.6 updated: [FLINK-10773] Harden resume externalized checkpoint end-to-end test

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new ed47ff8  [FLINK-10773] Harden resume externalized checkpoint 
end-to-end test
ed47ff8 is described below

commit ed47ff8d9128f19eb1a1f1b0db19fb95290a9182
Author: Till Rohrmann 
AuthorDate: Sun Nov 4 19:32:44 2018 +0100

[FLINK-10773] Harden resume externalized checkpoint end-to-end test

Ignore the 'Artificial Failure' exceptions and rename
ExceptionThrowingFailureMapper into FailureMapper to avoid false
positive exception matchings.
---
 .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java | 6 --
 .../apache/flink/streaming/tests/DataStreamAllroundTestProgram.java | 6 +++---
 .../{ExceptionThrowingFailureMapper.java => FailureMapper.java} | 4 ++--
 flink-end-to-end-tests/test-scripts/common.sh   | 1 +
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index fb92960..3c8d0ad 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -322,6 +322,8 @@ class DataStreamAllroundTestJobFactory {

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue( {
 
+   private static final long serialVersionUID = 
-3154419724891779938L;
+
@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
@@ -367,8 +369,8 @@ class DataStreamAllroundTestJobFactory {
return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), 
TEST_SIMULATE_FAILURE.defaultValue());
}
 
-   static MapFunction 
createExceptionThrowingFailureMapper(ParameterTool pt) {
-   return new ExceptionThrowingFailureMapper<>(
+   static MapFunction createFailureMapper(ParameterTool pt) {
+   return new FailureMapper<>(
pt.getLong(
TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),

TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 513d0cf..4372175 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -40,7 +40,7 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -67,7 +67,7 @@ public class DataStreamAllroundTestProgram {
private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
private static final String TIME_WINDOW_OPER_NAME = 
"TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
-   private static final String FAILURE_MAPPER_NAME = 
"ExceptionThrowingFailureMapper";
+   private static final String FAILURE_MAPPER_NAME = "FailureMapper";
 
public static 

[flink] branch release-1.7 updated: [hotfix] [table] Refactor SqlToConverter configuration

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 75589e1  [hotfix] [table] Refactor SqlToConverter configuration
75589e1 is described below

commit 75589e155b24405aada2eedabef74f794d9d5be2
Author: Timo Walther 
AuthorDate: Tue Oct 16 14:49:00 2018 +0200

[hotfix] [table] Refactor SqlToConverter configuration

This closes #6857.
---
 .../apache/flink/table/api/TableEnvironment.scala  | 36 ++
 .../flink/table/calcite/FlinkPlannerImpl.scala |  4 +--
 .../expressions/utils/ExpressionTestBase.scala |  3 +-
 .../table/match/PatternTranslatorTestBase.scala|  3 +-
 4 files changed, 20 insertions(+), 26 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 26f9e50..e28a471 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -91,6 +91,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 .costFactory(new DataSetCostFactory)
 .typeSystem(new FlinkTypeSystem)
 .operatorTable(getSqlOperatorTable)
+.sqlToRelConverterConfig(getSqlToRelConverterConfig)
 // set the executor to evaluate constant expressions
 .executor(new ExpressionReducer(config))
 .build
@@ -109,15 +110,6 @@ abstract class TableEnvironment(val config: TableConfig) {
   // registered external catalog names -> catalog
   private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
 
-  // configuration for SqlToRelConverter
-  private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = {
-val calciteConfig = config.getCalciteConfig
-calciteConfig.getSqlToRelConverterConfig match {
-  case Some(c) => c
-  case None => getSqlToRelConverterConfig
-}
-  }
-
   /** Returns the table config to define the runtime behavior of the Table 
API. */
   def getConfig: TableConfig = config
 
@@ -132,11 +124,18 @@ abstract class TableEnvironment(val config: TableConfig) {
 * Returns the SqlToRelConverter config.
 */
   protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
-SqlToRelConverter.configBuilder()
-  .withTrimUnusedFields(false)
-  .withConvertTableAccess(false)
-  .withInSubQueryThreshold(Integer.MAX_VALUE)
-  .build()
+val calciteConfig = config.getCalciteConfig
+calciteConfig.getSqlToRelConverterConfig match {
+
+  case None =>
+SqlToRelConverter.configBuilder()
+  .withTrimUnusedFields(false)
+  .withConvertTableAccess(false)
+  .withInSubQueryThreshold(Integer.MAX_VALUE)
+  .build()
+
+  case Some(c) => c
+}
   }
 
   /**
@@ -717,8 +716,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 val planner = new FlinkPlannerImpl(
   getFrameworkConfig,
   getPlanner,
-  getTypeFactory,
-  sqlToRelConverterConfig)
+  getTypeFactory)
 planner.getCompletionHints(statement, position)
   }
 
@@ -740,8 +738,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @return The result of the query as Table
 */
   def sqlQuery(query: String): Table = {
-val planner = new FlinkPlannerImpl(
-  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
 if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
@@ -801,8 +798,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @param config The [[QueryConfig]] to use.
 */
   def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
-val planner = new FlinkPlannerImpl(
-  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(stmt)
 parsed match {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index ca30053..400279d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -48,8 +48,7 @@ import scala.collection.JavaConversions._
 class FlinkPlannerImpl(
 config: FrameworkConfig,
 planner: 

[flink] branch master updated: [hotfix] [table] Refactor SqlToConverter configuration

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 34af3a7  [hotfix] [table] Refactor SqlToConverter configuration
34af3a7 is described below

commit 34af3a746c24d14164a65c861233969ee9d0395e
Author: Timo Walther 
AuthorDate: Tue Oct 16 14:49:00 2018 +0200

[hotfix] [table] Refactor SqlToConverter configuration

This closes #6857.
---
 .../apache/flink/table/api/TableEnvironment.scala  | 36 ++
 .../flink/table/calcite/FlinkPlannerImpl.scala |  4 +--
 .../expressions/utils/ExpressionTestBase.scala |  3 +-
 .../table/match/PatternTranslatorTestBase.scala|  3 +-
 4 files changed, 20 insertions(+), 26 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 26f9e50..e28a471 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -91,6 +91,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 .costFactory(new DataSetCostFactory)
 .typeSystem(new FlinkTypeSystem)
 .operatorTable(getSqlOperatorTable)
+.sqlToRelConverterConfig(getSqlToRelConverterConfig)
 // set the executor to evaluate constant expressions
 .executor(new ExpressionReducer(config))
 .build
@@ -109,15 +110,6 @@ abstract class TableEnvironment(val config: TableConfig) {
   // registered external catalog names -> catalog
   private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
 
-  // configuration for SqlToRelConverter
-  private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = {
-val calciteConfig = config.getCalciteConfig
-calciteConfig.getSqlToRelConverterConfig match {
-  case Some(c) => c
-  case None => getSqlToRelConverterConfig
-}
-  }
-
   /** Returns the table config to define the runtime behavior of the Table 
API. */
   def getConfig: TableConfig = config
 
@@ -132,11 +124,18 @@ abstract class TableEnvironment(val config: TableConfig) {
 * Returns the SqlToRelConverter config.
 */
   protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
-SqlToRelConverter.configBuilder()
-  .withTrimUnusedFields(false)
-  .withConvertTableAccess(false)
-  .withInSubQueryThreshold(Integer.MAX_VALUE)
-  .build()
+val calciteConfig = config.getCalciteConfig
+calciteConfig.getSqlToRelConverterConfig match {
+
+  case None =>
+SqlToRelConverter.configBuilder()
+  .withTrimUnusedFields(false)
+  .withConvertTableAccess(false)
+  .withInSubQueryThreshold(Integer.MAX_VALUE)
+  .build()
+
+  case Some(c) => c
+}
   }
 
   /**
@@ -717,8 +716,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 val planner = new FlinkPlannerImpl(
   getFrameworkConfig,
   getPlanner,
-  getTypeFactory,
-  sqlToRelConverterConfig)
+  getTypeFactory)
 planner.getCompletionHints(statement, position)
   }
 
@@ -740,8 +738,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @return The result of the query as Table
 */
   def sqlQuery(query: String): Table = {
-val planner = new FlinkPlannerImpl(
-  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(query)
 if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
@@ -801,8 +798,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @param config The [[QueryConfig]] to use.
 */
   def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
-val planner = new FlinkPlannerImpl(
-  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
 val parsed = planner.parse(stmt)
 parsed match {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index ca30053..400279d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -48,8 +48,7 @@ import scala.collection.JavaConversions._
 class FlinkPlannerImpl(
 config: FrameworkConfig,
 planner: RelOptPlanner,
-   

[flink] 02/03: [hotfix] [table] Simplify time attribute handling for joins

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4d6d14543666e71130b723589f1239943ca118d4
Author: Timo Walther 
AuthorDate: Mon Nov 5 13:57:37 2018 +0100

[hotfix] [table] Simplify time attribute handling for joins
---
 .../plan/rules/datastream/DataStreamJoinRule.scala | 41 --
 .../DataStreamTemporalTableJoinRule.scala  | 13 +---
 .../stream/sql/validation/JoinValidationTest.scala | 88 ++
 3 files changed, 89 insertions(+), 53 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
index f51c088..ad749d2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
@@ -20,10 +20,8 @@ package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
@@ -40,45 +38,32 @@ class DataStreamJoinRule
 FlinkConventions.DATASTREAM,
 "DataStreamJoinRule") {
 
-  /**
-* Checks if an expression accesses a time attribute.
-*
-* @param expr The expression to check.
-* @param inputType The input type of the expression.
-* @return True, if the expression accesses a time attribute. False 
otherwise.
-*/
-  def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
-expr match {
-  case i: RexInputRef =>
-val accessedType = inputType.getFieldList.get(i.getIndex).getType
-FlinkTypeFactory.isTimeIndicatorType(accessedType)
-  case c: RexCall =>
-c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
-  case _ => false
-}
-  }
-
   override def matches(call: RelOptRuleCall): Boolean = {
 val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
 val joinInfo = join.analyzeCondition
 
-val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
+val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate(
   joinInfo.getRemaining(join.getCluster.getRexBuilder),
   join.getLeft.getRowType.getFieldCount,
   join.getRowType,
   join.getCluster.getRexBuilder,
   TableConfig.DEFAULT)
 
-// remaining predicate must not access time attributes
-val remainingPredsAccessTime = remainingPreds.isDefined &&
-  accessesTimeAttribute(remainingPreds.get, join.getRowType)
+if (windowBounds.isDefined) {
+  return false
+}
 
-// Check that no event-time attributes are in the output because 
non-window join is unbounded
-// and we don't know how much to hold back watermarks.
+// Check that no event-time attributes are in the outputs (composed of two 
inputs)
+// because non-window join is unbounded and we don't know how much to hold 
back watermarks.
 val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
   .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
 
-windowBounds.isEmpty && !remainingPredsAccessTime && !rowTimeAttrInOutput
+if (rowTimeAttrInOutput) {
+  throw new TableException(
+"Rowtime attributes must not be in the input rows of a regular join. " 
+
+"As a workaround you can cast the time attributes of input tables to 
TIMESTAMP before.")
+}
+true
   }
 
   override def convert(rel: RelNode): RelNode = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
index 94ff19c..5bdea53 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
@@ -22,12 +22,10 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import 

[flink] branch release-1.7 updated (18c1579 -> 9c0fdde)

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 18c1579  [FLINK-10793][ttl] Change visibility of TtlValue and 
TtlSerializer to public for external tools
 new b7a76e0  [FLINK-8897] [table] Reintroduce materialization of time 
attributes in filters
 new 4d6d145  [hotfix] [table] Simplify time attribute handling for joins
 new 9c0fdde  [hotfix] [table] Move utility method down in JoinTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/calcite/RelTimeIndicatorConverter.scala  |  82 --
 .../plan/rules/datastream/DataStreamJoinRule.scala |  41 +++
 .../DataStreamTemporalTableJoinRule.scala  |  13 +--
 .../datastream/DataStreamWindowJoinRule.scala  |   2 +-
 .../flink/table/runtime/join/WindowJoinUtil.scala  |  12 +-
 .../flink/table/api/stream/sql/JoinTest.scala  | 124 +++--
 .../stream/sql/validation/JoinValidationTest.scala |  88 ---
 .../flink/table/api/stream/table/JoinTest.scala|  39 ---
 .../table/plan/TimeIndicatorConversionTest.scala   |   2 +-
 .../runtime/stream/TimeAttributesITCase.scala  |  46 
 10 files changed, 282 insertions(+), 167 deletions(-)



[flink] 03/03: [hotfix] [table] Move utility method down in JoinTest

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c0fddecb15536e4624c5b88af91ce19ee5ff622
Author: Timo Walther 
AuthorDate: Mon Nov 5 13:58:12 2018 +0100

[hotfix] [table] Move utility method down in JoinTest
---
 .../flink/table/api/stream/sql/JoinTest.scala  | 58 +++---
 1 file changed, 29 insertions(+), 29 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index f435113..37d5bc1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -790,35 +790,6 @@ class JoinTest extends TableTestBase {
   "AND(=($0, $4), >($2, $6))")
   }
 
-  private def verifyTimeBoundary(
-  timeSql: String,
-  expLeftSize: Long,
-  expRightSize: Long,
-  expTimeType: String): Unit = {
-val query =
-  "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a 
and " + timeSql
-
-val resultTable = streamUtil.tableEnv.sqlQuery(query)
-val relNode = RelTimeIndicatorConverter.convert(
-  resultTable.getRelNode,
-  streamUtil.tableEnv.getRelBuilder.getRexBuilder)
-val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
-val (windowBounds, _) =
-  WindowJoinUtil.extractWindowBoundsFromPredicate(
-joinNode.getCondition,
-4,
-joinNode.getRowType,
-joinNode.getCluster.getRexBuilder,
-streamUtil.tableEnv.getConfig)
-
-val timeTypeStr =
-  if (windowBounds.get.isEventTime) "rowtime"
-  else  "proctime"
-assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
-assertEquals(expRightSize, windowBounds.get.leftUpperBound)
-assertEquals(expTimeType, timeTypeStr)
-  }
-
   @Test
   def testLeftOuterJoinEquiPred(): Unit = {
 val util = streamTestUtil()
@@ -1009,6 +980,35 @@ class JoinTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  private def verifyTimeBoundary(
+  timeSql: String,
+  expLeftSize: Long,
+  expRightSize: Long,
+  expTimeType: String): Unit = {
+val query =
+  "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a 
and " + timeSql
+
+val resultTable = streamUtil.tableEnv.sqlQuery(query)
+val relNode = RelTimeIndicatorConverter.convert(
+  resultTable.getRelNode,
+  streamUtil.tableEnv.getRelBuilder.getRexBuilder)
+val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+val (windowBounds, _) =
+  WindowJoinUtil.extractWindowBoundsFromPredicate(
+joinNode.getCondition,
+4,
+joinNode.getRowType,
+joinNode.getCluster.getRexBuilder,
+streamUtil.tableEnv.getConfig)
+
+val timeTypeStr =
+  if (windowBounds.get.isEventTime) "rowtime"
+  else  "proctime"
+assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
+assertEquals(expRightSize, windowBounds.get.leftUpperBound)
+assertEquals(expTimeType, timeTypeStr)
+  }
+
   private def verifyRemainConditionConvert(
   query: String,
   expectCondStr: String): Unit = {



[flink] 01/03: [FLINK-8897] [table] Reintroduce materialization of time attributes in filters

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7a76e0afd12b8c84c5e3a580e77fe6b215f150b
Author: Timo Walther 
AuthorDate: Wed Oct 31 15:27:47 2018 +0100

[FLINK-8897] [table] Reintroduce materialization of time attributes in 
filters
---
 .../table/calcite/RelTimeIndicatorConverter.scala  | 82 +-
 .../plan/rules/datastream/DataStreamJoinRule.scala |  2 +-
 .../datastream/DataStreamWindowJoinRule.scala  |  2 +-
 .../flink/table/runtime/join/WindowJoinUtil.scala  | 12 ++--
 .../flink/table/api/stream/sql/JoinTest.scala  | 72 ++-
 .../flink/table/api/stream/table/JoinTest.scala| 39 +-
 .../table/plan/TimeIndicatorConversionTest.scala   |  2 +-
 .../runtime/stream/TimeAttributesITCase.scala  | 46 
 8 files changed, 168 insertions(+), 89 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 41f0fc5..c1bcf14 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelNode, RelShuttle}
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 import org.apache.flink.table.api.{TableException, ValidationException}
@@ -100,13 +101,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(matchRel: LogicalMatch): RelNode = {
 // visit children and update inputs
 val input = matchRel.getInput.accept(this)
-
-// check if input field contains time indicator type
-// materialize field if no time indicator is present anymore
-// if input field is already materialized, change to timestamp type
-val materializer = new RexTimeIndicatorMaterializer(
-  rexBuilder,
-  input.getRowType.getFieldList.map(_.getType))
+val materializer = createMaterializer(input)
 
 // update input expressions
 val patternDefs = 
matchRel.getPatternDefinitions.mapValues(_.accept(materializer))
@@ -180,23 +175,16 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(filter: LogicalFilter): RelNode = {
 // visit children and update inputs
 val input = filter.getInput.accept(this)
+val materializer = createMaterializer(input)
 
-// We do not materialize time indicators in conditions because they can be 
locally evaluated.
-// Some conditions are evaluated by special operators (e.g., time window 
joins).
-// Time indicators in remaining conditions are materialized by Calc before 
the code generation.
-LogicalFilter.create(input, filter.getCondition)
+val condition = filter.getCondition.accept(materializer)
+LogicalFilter.create(input, condition)
   }
 
   override def visit(project: LogicalProject): RelNode = {
 // visit children and update inputs
 val input = project.getInput.accept(this)
-
-// check if input field contains time indicator type
-// materialize field if no time indicator is present anymore
-// if input field is already materialized, change to timestamp type
-val materializer = new RexTimeIndicatorMaterializer(
-  rexBuilder,
-  input.getRowType.getFieldList.map(_.getType))
+val materializer = createMaterializer(input)
 
 val projects = project.getProjects.map(_.accept(materializer))
 val fieldNames = project.getRowType.getFieldNames
@@ -206,8 +194,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(join: LogicalJoin): RelNode = {
 val left = join.getLeft.accept(this)
 val right = join.getRight.accept(this)
+val materializer = createMaterializer(left, right)
 
-LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, 
join.getJoinType)
+LogicalJoin.create(
+  left,
+  right,
+  join.getCondition.accept(materializer),
+  join.getVariablesSet,
+  join.getJoinType)
   }
 
   def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
@@ -229,19 +223,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   case scan: LogicalTableFunctionScan =>
 // visit children and update inputs
 val scanInputs = scan.getInputs.map(_.accept(this))
-
-// check if input field contains time indicator type
-// materialize field 

[flink] branch master updated (e1d975d -> 9f31d5c)

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e1d975d  [FLINK-10793][ttl] Change visibility of TtlValue and 
TtlSerializer to public for external tools
 new fa256b2  [FLINK-8897] [table] Reintroduce materialization of time 
attributes in filters
 new d041baa  [hotfix] [table] Simplify time attribute handling for joins
 new 9f31d5c  [hotfix] [table] Move utility method down in JoinTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/calcite/RelTimeIndicatorConverter.scala  |  82 --
 .../plan/rules/datastream/DataStreamJoinRule.scala |  41 +++
 .../DataStreamTemporalTableJoinRule.scala  |  13 +--
 .../datastream/DataStreamWindowJoinRule.scala  |   2 +-
 .../flink/table/runtime/join/WindowJoinUtil.scala  |  12 +-
 .../flink/table/api/stream/sql/JoinTest.scala  | 124 +++--
 .../stream/sql/validation/JoinValidationTest.scala |  88 ---
 .../flink/table/api/stream/table/JoinTest.scala|  39 ---
 .../table/plan/TimeIndicatorConversionTest.scala   |   2 +-
 .../runtime/stream/TimeAttributesITCase.scala  |  46 
 10 files changed, 282 insertions(+), 167 deletions(-)



[flink] 02/03: [hotfix] [table] Simplify time attribute handling for joins

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d041baaf3f16d58105d958ba6c2928fb143298f2
Author: Timo Walther 
AuthorDate: Mon Nov 5 13:57:37 2018 +0100

[hotfix] [table] Simplify time attribute handling for joins
---
 .../plan/rules/datastream/DataStreamJoinRule.scala | 41 --
 .../DataStreamTemporalTableJoinRule.scala  | 13 +---
 .../stream/sql/validation/JoinValidationTest.scala | 88 ++
 3 files changed, 89 insertions(+), 53 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
index f51c088..ad749d2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
@@ -20,10 +20,8 @@ package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
@@ -40,45 +38,32 @@ class DataStreamJoinRule
 FlinkConventions.DATASTREAM,
 "DataStreamJoinRule") {
 
-  /**
-* Checks if an expression accesses a time attribute.
-*
-* @param expr The expression to check.
-* @param inputType The input type of the expression.
-* @return True, if the expression accesses a time attribute. False 
otherwise.
-*/
-  def accessesTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
-expr match {
-  case i: RexInputRef =>
-val accessedType = inputType.getFieldList.get(i.getIndex).getType
-FlinkTypeFactory.isTimeIndicatorType(accessedType)
-  case c: RexCall =>
-c.operands.asScala.exists(accessesTimeAttribute(_, inputType))
-  case _ => false
-}
-  }
-
   override def matches(call: RelOptRuleCall): Boolean = {
 val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
 val joinInfo = join.analyzeCondition
 
-val (windowBounds, remainingPreds) = 
WindowJoinUtil.extractWindowBoundsFromPredicate(
+val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate(
   joinInfo.getRemaining(join.getCluster.getRexBuilder),
   join.getLeft.getRowType.getFieldCount,
   join.getRowType,
   join.getCluster.getRexBuilder,
   TableConfig.DEFAULT)
 
-// remaining predicate must not access time attributes
-val remainingPredsAccessTime = remainingPreds.isDefined &&
-  accessesTimeAttribute(remainingPreds.get, join.getRowType)
+if (windowBounds.isDefined) {
+  return false
+}
 
-// Check that no event-time attributes are in the output because 
non-window join is unbounded
-// and we don't know how much to hold back watermarks.
+// Check that no event-time attributes are in the outputs (composed of two 
inputs)
+// because non-window join is unbounded and we don't know how much to hold 
back watermarks.
 val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
   .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
 
-windowBounds.isEmpty && !remainingPredsAccessTime && !rowTimeAttrInOutput
+if (rowTimeAttrInOutput) {
+  throw new TableException(
+"Rowtime attributes must not be in the input rows of a regular join. " 
+
+"As a workaround you can cast the time attributes of input tables to 
TIMESTAMP before.")
+}
+true
   }
 
   override def convert(rel: RelNode): RelNode = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
index 94ff19c..5bdea53 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamTemporalTableJoinRule.scala
@@ -22,12 +22,10 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 

[flink] 03/03: [hotfix] [table] Move utility method down in JoinTest

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f31d5c76086943d92cbaa27b035c351d9ac3dc8
Author: Timo Walther 
AuthorDate: Mon Nov 5 13:58:12 2018 +0100

[hotfix] [table] Move utility method down in JoinTest
---
 .../flink/table/api/stream/sql/JoinTest.scala  | 58 +++---
 1 file changed, 29 insertions(+), 29 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index f435113..37d5bc1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -790,35 +790,6 @@ class JoinTest extends TableTestBase {
   "AND(=($0, $4), >($2, $6))")
   }
 
-  private def verifyTimeBoundary(
-  timeSql: String,
-  expLeftSize: Long,
-  expRightSize: Long,
-  expTimeType: String): Unit = {
-val query =
-  "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a 
and " + timeSql
-
-val resultTable = streamUtil.tableEnv.sqlQuery(query)
-val relNode = RelTimeIndicatorConverter.convert(
-  resultTable.getRelNode,
-  streamUtil.tableEnv.getRelBuilder.getRexBuilder)
-val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
-val (windowBounds, _) =
-  WindowJoinUtil.extractWindowBoundsFromPredicate(
-joinNode.getCondition,
-4,
-joinNode.getRowType,
-joinNode.getCluster.getRexBuilder,
-streamUtil.tableEnv.getConfig)
-
-val timeTypeStr =
-  if (windowBounds.get.isEventTime) "rowtime"
-  else  "proctime"
-assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
-assertEquals(expRightSize, windowBounds.get.leftUpperBound)
-assertEquals(expTimeType, timeTypeStr)
-  }
-
   @Test
   def testLeftOuterJoinEquiPred(): Unit = {
 val util = streamTestUtil()
@@ -1009,6 +980,35 @@ class JoinTest extends TableTestBase {
 util.verifyTable(result, expected)
   }
 
+  private def verifyTimeBoundary(
+  timeSql: String,
+  expLeftSize: Long,
+  expRightSize: Long,
+  expTimeType: String): Unit = {
+val query =
+  "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a 
and " + timeSql
+
+val resultTable = streamUtil.tableEnv.sqlQuery(query)
+val relNode = RelTimeIndicatorConverter.convert(
+  resultTable.getRelNode,
+  streamUtil.tableEnv.getRelBuilder.getRexBuilder)
+val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
+val (windowBounds, _) =
+  WindowJoinUtil.extractWindowBoundsFromPredicate(
+joinNode.getCondition,
+4,
+joinNode.getRowType,
+joinNode.getCluster.getRexBuilder,
+streamUtil.tableEnv.getConfig)
+
+val timeTypeStr =
+  if (windowBounds.get.isEventTime) "rowtime"
+  else  "proctime"
+assertEquals(expLeftSize, windowBounds.get.leftLowerBound)
+assertEquals(expRightSize, windowBounds.get.leftUpperBound)
+assertEquals(expTimeType, timeTypeStr)
+  }
+
   private def verifyRemainConditionConvert(
   query: String,
   expectCondStr: String): Unit = {



[flink] 01/03: [FLINK-8897] [table] Reintroduce materialization of time attributes in filters

2018-11-05 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa256b2a8983d899f71ba8bbda415db9a57f6c23
Author: Timo Walther 
AuthorDate: Wed Oct 31 15:27:47 2018 +0100

[FLINK-8897] [table] Reintroduce materialization of time attributes in 
filters
---
 .../table/calcite/RelTimeIndicatorConverter.scala  | 82 +-
 .../plan/rules/datastream/DataStreamJoinRule.scala |  2 +-
 .../datastream/DataStreamWindowJoinRule.scala  |  2 +-
 .../flink/table/runtime/join/WindowJoinUtil.scala  | 12 ++--
 .../flink/table/api/stream/sql/JoinTest.scala  | 72 ++-
 .../flink/table/api/stream/table/JoinTest.scala| 39 +-
 .../table/plan/TimeIndicatorConversionTest.scala   |  2 +-
 .../runtime/stream/TimeAttributesITCase.scala  | 46 
 8 files changed, 168 insertions(+), 89 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 41f0fc5..c1bcf14 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelNode, RelShuttle}
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 import org.apache.flink.table.api.{TableException, ValidationException}
@@ -100,13 +101,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(matchRel: LogicalMatch): RelNode = {
 // visit children and update inputs
 val input = matchRel.getInput.accept(this)
-
-// check if input field contains time indicator type
-// materialize field if no time indicator is present anymore
-// if input field is already materialized, change to timestamp type
-val materializer = new RexTimeIndicatorMaterializer(
-  rexBuilder,
-  input.getRowType.getFieldList.map(_.getType))
+val materializer = createMaterializer(input)
 
 // update input expressions
 val patternDefs = 
matchRel.getPatternDefinitions.mapValues(_.accept(materializer))
@@ -180,23 +175,16 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(filter: LogicalFilter): RelNode = {
 // visit children and update inputs
 val input = filter.getInput.accept(this)
+val materializer = createMaterializer(input)
 
-// We do not materialize time indicators in conditions because they can be 
locally evaluated.
-// Some conditions are evaluated by special operators (e.g., time window 
joins).
-// Time indicators in remaining conditions are materialized by Calc before 
the code generation.
-LogicalFilter.create(input, filter.getCondition)
+val condition = filter.getCondition.accept(materializer)
+LogicalFilter.create(input, condition)
   }
 
   override def visit(project: LogicalProject): RelNode = {
 // visit children and update inputs
 val input = project.getInput.accept(this)
-
-// check if input field contains time indicator type
-// materialize field if no time indicator is present anymore
-// if input field is already materialized, change to timestamp type
-val materializer = new RexTimeIndicatorMaterializer(
-  rexBuilder,
-  input.getRowType.getFieldList.map(_.getType))
+val materializer = createMaterializer(input)
 
 val projects = project.getProjects.map(_.accept(materializer))
 val fieldNames = project.getRowType.getFieldNames
@@ -206,8 +194,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(join: LogicalJoin): RelNode = {
 val left = join.getLeft.accept(this)
 val right = join.getRight.accept(this)
+val materializer = createMaterializer(left, right)
 
-LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, 
join.getJoinType)
+LogicalJoin.create(
+  left,
+  right,
+  join.getCondition.accept(materializer),
+  join.getVariablesSet,
+  join.getJoinType)
   }
 
   def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
@@ -229,19 +223,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   case scan: LogicalTableFunctionScan =>
 // visit children and update inputs
 val scanInputs = scan.getInputs.map(_.accept(this))
-
-// check if input field contains time indicator type
-// materialize field if no 

[flink] branch release-1.7 updated: [FLINK-10793][ttl] Change visibility of TtlValue and TtlSerializer to public for external tools

2018-11-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 18c1579  [FLINK-10793][ttl] Change visibility of TtlValue and 
TtlSerializer to public for external tools
18c1579 is described below

commit 18c157970edefececbd040c438ae25cc44d47c42
Author: Stefan Richter 
AuthorDate: Mon Nov 5 11:39:51 2018 +0100

[FLINK-10793][ttl] Change visibility of TtlValue and TtlSerializer to 
public for external tools

This closes #7021.
---
 .../org/apache/flink/runtime/state/ttl/TtlStateFactory.java| 10 ++
 .../main/java/org/apache/flink/runtime/state/ttl/TtlValue.java | 10 +-
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 45f4e3b..0a881c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -188,15 +188,17 @@ public class TtlStateFactory {
}
}
 
-   /** Serializer for user state value with TTL. */
-   private static class TtlSerializer extends 
CompositeSerializer> {
+   /**
+* Serializer for user state value with TTL. Visibility is public for 
usage with external tools.
+*/
+   public static class TtlSerializer extends 
CompositeSerializer> {
private static final long serialVersionUID = 
131020282727167064L;
 
-   TtlSerializer(TypeSerializer userValueSerializer) {
+   public TtlSerializer(TypeSerializer userValueSerializer) {
super(true, LongSerializer.INSTANCE, 
userValueSerializer);
}
 
-   TtlSerializer(PrecomputedParameters precomputed, 
TypeSerializer ... fieldSerializers) {
+   public TtlSerializer(PrecomputedParameters precomputed, 
TypeSerializer ... fieldSerializers) {
super(precomputed, fieldSerializers);
}
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index 48435d5..dba06d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -23,28 +23,28 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 
 /**
- * This class wraps user value of state with TTL.
+ * This class wraps user value of state with TTL. Visibility is public for 
usage with external tools.
  *
  * @param  Type of the user value of state with TTL
  */
-class TtlValue implements Serializable {
+public class TtlValue implements Serializable {
private static final long serialVersionUID = 5221129704201125020L;
 
@Nullable
private final T userValue;
private final long lastAccessTimestamp;
 
-   TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
+   public TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
this.userValue = userValue;
this.lastAccessTimestamp = lastAccessTimestamp;
}
 
@Nullable
-   T getUserValue() {
+   public T getUserValue() {
return userValue;
}
 
-   long getLastAccessTimestamp() {
+   public long getLastAccessTimestamp() {
return lastAccessTimestamp;
}
 }



[flink] branch release-1.7 updated: [FLINK-10627][e2e] Test s3 output for streaming file sink.

2018-11-05 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 6128ff8  [FLINK-10627][e2e] Test s3 output for streaming file sink.
6128ff8 is described below

commit 6128ff88fd47096c2056a90f07d1fd8eb486344d
Author: Andrey Zagrebin 
AuthorDate: Thu Oct 25 17:11:49 2018 +0200

[FLINK-10627][e2e] Test s3 output for streaming file sink.
---
 .../flink-e2e-test-utils/pom.xml   |  71 ++
 .../flink/streaming/tests/util/s3/S3QueryUtil.java |  92 
 .../streaming/tests/util/s3/S3UtilProgram.java | 225 +++
 flink-end-to-end-tests/pom.xml |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh|   1 +
 flink-end-to-end-tests/test-scripts/common.sh  |  54 ++---
 flink-end-to-end-tests/test-scripts/common_s3.sh   | 240 +
 .../test-scripts/test_shaded_hadoop_s3a.sh |  23 +-
 .../test-scripts/test_shaded_presto_s3.sh  |  21 +-
 .../test-scripts/test_streaming_file_sink.sh   |  80 ---
 10 files changed, 710 insertions(+), 98 deletions(-)

diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml 
b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml
new file mode 100644
index 000..219d662
--- /dev/null
+++ b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml
@@ -0,0 +1,71 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.7-SNAPSHOT
+   
+   4.0.0
+
+   flink-e2e-test-utils
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   com.amazonaws
+   aws-java-sdk-s3
+   1.11.437
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   S3UtilProgram
+   package
+   
+   shade
+   
+   
+   
S3UtilProgram
+   
+   
+   
org.apache.flink.streaming.tests.util.s3.S3UtilProgram
+   
+   
+   
+   
+   
+   
+   
+   
+
+
diff --git 
a/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
 
b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
new file mode 100644
index 000..781a8ed
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.util.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CSVInput;
+import com.amazonaws.services.s3.model.CSVOutput;
+import com.amazonaws.services.s3.model.CompressionType;
+import 

[flink] branch master updated: [FLINK-10627][e2e] Test s3 output for streaming file sink.

2018-11-05 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e9703e7  [FLINK-10627][e2e] Test s3 output for streaming file sink.
e9703e7 is described below

commit e9703e70e951f98e734c965569ce56cbb59fd710
Author: Andrey Zagrebin 
AuthorDate: Thu Oct 25 17:11:49 2018 +0200

[FLINK-10627][e2e] Test s3 output for streaming file sink.

This closes #6957.
---
 .../flink-e2e-test-utils/pom.xml   |  71 ++
 .../flink/streaming/tests/util/s3/S3QueryUtil.java |  92 
 .../streaming/tests/util/s3/S3UtilProgram.java | 225 +++
 flink-end-to-end-tests/pom.xml |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh|   1 +
 flink-end-to-end-tests/test-scripts/common.sh  |  54 ++---
 flink-end-to-end-tests/test-scripts/common_s3.sh   | 240 +
 .../test-scripts/test_shaded_hadoop_s3a.sh |  23 +-
 .../test-scripts/test_shaded_presto_s3.sh  |  21 +-
 .../test-scripts/test_streaming_file_sink.sh   |  80 ---
 10 files changed, 710 insertions(+), 98 deletions(-)

diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml 
b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml
new file mode 100644
index 000..67988b3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml
@@ -0,0 +1,71 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   flink-e2e-test-utils
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   com.amazonaws
+   aws-java-sdk-s3
+   1.11.437
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   S3UtilProgram
+   package
+   
+   shade
+   
+   
+   
S3UtilProgram
+   
+   
+   
org.apache.flink.streaming.tests.util.s3.S3UtilProgram
+   
+   
+   
+   
+   
+   
+   
+   
+
+
diff --git 
a/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
 
b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
new file mode 100644
index 000..781a8ed
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.util.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CSVInput;
+import com.amazonaws.services.s3.model.CSVOutput;
+import com.amazonaws.services.s3.model.CompressionType;
+import 

[flink] branch release-1.7 updated: [FLINK-10638][table] Invalid table scan resolution for temporal join queries

2018-11-05 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 97ac1ab  [FLINK-10638][table] Invalid table scan resolution for 
temporal join queries
97ac1ab is described below

commit 97ac1ab0cc513511a0b61b52663e451c0e52218b
Author: Piotr Nowojski 
AuthorDate: Thu Nov 1 12:19:14 2018 +0100

[FLINK-10638][table] Invalid table scan resolution for temporal join queries

Previously there was a strict fixed order of applying 
LogicalCorrelateToTemporalTableJoinRule
and TableScanRule rules. This was causing problems, since either of them 
could create a new
RelNodes that have to be subject of the other rule (imagine deeply nested 
TemporalTableFunction
that references registered tables/views and other TemporalTableFunctions).

Solution to this problem is to run both of those rules in one 
group/collection in HepPlaner.
Instead of applying one rule to whole tree then the other rule, both rules 
are applied to
a parent node, before going down/deeper.
---
 .../flink/table/api/BatchTableEnvironment.scala|  5 +-
 .../flink/table/api/StreamTableEnvironment.scala   |  8 +--
 .../apache/flink/table/api/TableEnvironment.scala  | 64 --
 .../flink/table/plan/rules/FlinkRuleSets.scala | 16 ++
 .../api/stream/sql/TemporalTableJoinTest.scala |  4 +-
 .../api/stream/table/TemporalTableJoinTest.scala   |  7 ++-
 .../runtime/stream/sql/TemporalJoinITCase.scala| 10 ++--
 7 files changed, 72 insertions(+), 42 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 6a7a921..99e9d7e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -449,9 +449,8 @@ abstract class BatchTableEnvironment(
 */
   private[flink] def optimize(relNode: RelNode): RelNode = {
 val convSubQueryPlan = optimizeConvertSubQueries(relNode)
-val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
-val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan)
-val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
+val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
 val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
 val logicalPlan = optimizeLogicalPlan(normalizedPlan)
 optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4973f34..8c6a1e0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -804,13 +804,13 @@ abstract class StreamTableEnvironment(
 */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): 
RelNode = {
 val convSubQueryPlan = optimizeConvertSubQueries(relNode)
-val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
-val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan)
-val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
+val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
 val planWithMaterializedTimeAttributes =
   RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
 val normalizedPlan = 
optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
 val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+
 val physicalPlan = optimizePhysicalPlan(logicalPlan, 
FlinkConventions.DATASTREAM)
 optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
   }
@@ -827,7 +827,7 @@ abstract class StreamTableEnvironment(
   } else {
 relNode
   }
-  runHepPlanner(
+  runHepPlannerSequentially(
 HepMatchOrder.BOTTOM_UP,
 decoRuleSet,
 planToDecorate,
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 58831d1..26f9e50 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 

[flink] branch master updated (b9351fc -> c58d37e)

2018-11-05 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b9351fc  [FLINK-10364][tests] Fix instability in 
NonHAQueryableStateFsBackendITCase#testMapState
 add c58d37e  [FLINK-10638][table] Invalid table scan resolution for 
temporal join queries

No new revisions were added by this update.

Summary of changes:
 .../flink/table/api/BatchTableEnvironment.scala|  5 +-
 .../flink/table/api/StreamTableEnvironment.scala   |  8 +--
 .../apache/flink/table/api/TableEnvironment.scala  | 64 --
 .../flink/table/plan/rules/FlinkRuleSets.scala | 16 ++
 .../api/stream/sql/TemporalTableJoinTest.scala |  4 +-
 .../api/stream/table/TemporalTableJoinTest.scala   |  7 ++-
 .../runtime/stream/sql/TemporalJoinITCase.scala| 10 ++--
 7 files changed, 72 insertions(+), 42 deletions(-)



[flink] branch release-1.6 updated: [FLINK-10364][tests] Fix instability in NonHAQueryableStateFsBackendITCase#testMapState

2018-11-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new 4f3e125  [FLINK-10364][tests] Fix instability in 
NonHAQueryableStateFsBackendITCase#testMapState
4f3e125 is described below

commit 4f3e125aa00d731129de4049bee14907a7dfa277
Author: Stefan Richter 
AuthorDate: Wed Oct 31 11:57:56 2018 +0100

[FLINK-10364][tests] Fix instability in 
NonHAQueryableStateFsBackendITCase#testMapState

This closes #6975.
---
 .../queryablestate/itcases/AbstractQueryableStateTestBase.java| 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index e99a28b..83976f1 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -836,9 +836,11 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
false,
executor);
 
-   Tuple2 value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
-   assertEquals("Key mismatch", key, 
value.f0.intValue());
-   if (expected == value.f1) {
+   Tuple2 value =
+   
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+
+   if (value != null && value.f0 != null 
&& expected == value.f1) {
+   assertEquals("Key mismatch", 
key, value.f0.intValue());
success = true;
} else {
// Retry



[flink] branch master updated: [FLINK-10364][tests] Fix instability in NonHAQueryableStateFsBackendITCase#testMapState

2018-11-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b9351fc  [FLINK-10364][tests] Fix instability in 
NonHAQueryableStateFsBackendITCase#testMapState
b9351fc is described below

commit b9351fcf6d8ab5b071246ce76f2e66f569c94708
Author: Stefan Richter 
AuthorDate: Wed Oct 31 11:57:56 2018 +0100

[FLINK-10364][tests] Fix instability in 
NonHAQueryableStateFsBackendITCase#testMapState

This closes #6975.
---
 .../queryablestate/itcases/AbstractQueryableStateTestBase.java| 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index e99a28b..83976f1 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -836,9 +836,11 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
false,
executor);
 
-   Tuple2 value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
-   assertEquals("Key mismatch", key, 
value.f0.intValue());
-   if (expected == value.f1) {
+   Tuple2 value =
+   
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+
+   if (value != null && value.f0 != null 
&& expected == value.f1) {
+   assertEquals("Key mismatch", 
key, value.f0.intValue());
success = true;
} else {
// Retry



[flink] annotated tag release-1.7.0-rc1 created (now 9b2551e)

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to annotated tag release-1.7.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


  at 9b2551e  (tag)
 tagging 026d7368468cb2499a94e7ef71c4e0d70b783ca6 (commit)
 replaces pre-apache-rename
  by Till Rohrmann
  on Mon Nov 5 12:39:05 2018 +0100

- Log -
release-1.7.0-rc1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEENFF/bqllF4fQbsqJHzAlaals/9UFAlvgK9kACgkQHzAlaals
/9WGyg/+ORc6DgJu4p+bCnLb7GgzcbLXtfUoMyW8hXz67mQVJVad1eHsH0WcVB7k
iFqh26LN7YMSQQrI54uiWhm2LUP5GccqVGezMoDkEtkWiV4SDfNspPK9UABkjU6K
dHADbRl4Xe0ukQcuizk0OUkUVZZRRM3+GkkwlualIuhAk3+fg9g9LUw6CKs3P3YO
yLWZhgMpUkTAFWvK63lmX4zxq9jk7eeWh62eFRQ3b2t9Isb+Dta2iQxR0TzkitWW
om76yHX3/nsJDkYsWFMnKhcZQcXgS0g6tplb8wzC2ejM57pfpk85uQ20QnDvGoO6
ei5OjimdBhm61Yq3+77jQNG5EyWbzrTpSduT5mXxYH7qq+zn4k4vJZ4KEBDIP9el
lcYu0aWpOyfaZj+11PuRxQuRBFIUJkKBvF35bjk5Ybb3DYzmi6GYJtMkqUbv
vczav24HHSWPhTqCMcJUJcCvi8wqvJ+Nloh2y40L6Ax9Tb4U+bBU5JFzI6cB9NgM
+s2MYbMO7eBp2x6T1IjUJB9ASFDbyltMSqI2YtxOidRhm/s15eLsSjcrykipCkgT
AZtfEP5mBuNH/kO3RHYND1voHEnodnxMN/wwOc4kSvOF1fD15QyN7Klj4z25lYCJ
emSNu7EiVH3YVRsAXuh/cHc+Ci445tHrFWWtRYhYK41oxqnqUeI=
=CfN4
-END PGP SIGNATURE-
---

This annotated tag includes the following new commits:

 new 026d736  Commit for release 1.7.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink] 01/01: Commit for release 1.7.0

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to annotated tag release-1.7.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 026d7368468cb2499a94e7ef71c4e0d70b783ca6
Author: Till Rohrmann 
AuthorDate: Mon Nov 5 12:38:57 2018 +0100

Commit for release 1.7.0
---
 docs/_config.yml  | 4 ++--
 flink-annotations/pom.xml | 2 +-
 flink-clients/pom.xml | 2 +-
 flink-connectors/flink-connector-cassandra/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch2/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch5/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-connector-filesystem/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-0.10/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-0.11/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-0.8/pom.xml| 2 +-
 flink-connectors/flink-connector-kafka-0.9/pom.xml| 2 +-
 flink-connectors/flink-connector-kafka-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-connector-nifi/pom.xml | 2 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/flink-connector-twitter/pom.xml  | 2 +-
 flink-connectors/flink-hadoop-compatibility/pom.xml   | 2 +-
 flink-connectors/flink-hbase/pom.xml  | 2 +-
 flink-connectors/flink-hcatalog/pom.xml   | 2 +-
 flink-connectors/flink-jdbc/pom.xml   | 2 +-
 flink-connectors/flink-orc/pom.xml| 2 +-
 flink-connectors/pom.xml  | 2 +-
 flink-container/pom.xml   | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml   | 2 +-
 flink-contrib/flink-storm-examples/pom.xml| 2 +-
 flink-contrib/flink-storm/pom.xml | 2 +-
 flink-contrib/pom.xml | 2 +-
 flink-core/pom.xml| 2 +-
 flink-dist/pom.xml| 2 +-
 flink-docs/pom.xml| 2 +-
 flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +-
 flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +-
 flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-high-parallelism-iterations-test/pom.xml | 2 +-
 .../flink-local-recovery-and-allocation-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-queryable-state-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-quickstart-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-sql-client-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-state-evolution-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-stream-sql-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml| 2 +-
 flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml | 2 +-
 flink-end-to-end-tests/pom.xml| 2 +-
 flink-examples/flink-examples-batch/pom.xml   | 2 +-
 flink-examples/flink-examples-streaming/pom.xml   | 2 +-
 flink-examples/flink-examples-table/pom.xml   | 2 +-
 flink-examples/pom.xml| 2 +-
 flink-filesystems/flink-fs-hadoop-shaded/pom.xml

[flink] branch release-1.7 updated: [FLINK-10773] Harden resume externalized checkpoint end-to-end test

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 80bed3d  [FLINK-10773] Harden resume externalized checkpoint 
end-to-end test
80bed3d is described below

commit 80bed3d87b2722f0f6c04930b3ab1bdde860962c
Author: Till Rohrmann 
AuthorDate: Sun Nov 4 19:32:44 2018 +0100

[FLINK-10773] Harden resume externalized checkpoint end-to-end test

Ignore the 'Artificial Failure' exceptions and rename
ExceptionThrowingFailureMapper into FailureMapper to avoid false
positive exception matchings.
---
 .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java | 6 --
 .../apache/flink/streaming/tests/DataStreamAllroundTestProgram.java | 6 +++---
 .../{ExceptionThrowingFailureMapper.java => FailureMapper.java} | 4 ++--
 flink-end-to-end-tests/test-scripts/common.sh   | 1 +
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index fb92960..3c8d0ad 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -322,6 +322,8 @@ class DataStreamAllroundTestJobFactory {

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue( {
 
+   private static final long serialVersionUID = 
-3154419724891779938L;
+
@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
@@ -367,8 +369,8 @@ class DataStreamAllroundTestJobFactory {
return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), 
TEST_SIMULATE_FAILURE.defaultValue());
}
 
-   static MapFunction 
createExceptionThrowingFailureMapper(ParameterTool pt) {
-   return new ExceptionThrowingFailureMapper<>(
+   static MapFunction createFailureMapper(ParameterTool pt) {
+   return new FailureMapper<>(
pt.getLong(
TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),

TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 70fdade..b14e2af 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -41,7 +41,7 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -68,7 +68,7 @@ public class DataStreamAllroundTestProgram {
private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
private static final String TIME_WINDOW_OPER_NAME = 
"TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
-   private static final String FAILURE_MAPPER_NAME = 
"ExceptionThrowingFailureMapper";
+   private static final String FAILURE_MAPPER_NAME = "FailureMapper";
 
public static 

[flink] branch master updated: [FLINK-10773] Harden resume externalized checkpoint end-to-end test

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 31e0098  [FLINK-10773] Harden resume externalized checkpoint 
end-to-end test
31e0098 is described below

commit 31e0098abd267d1219d93a3564b65f69e3282bd7
Author: Till Rohrmann 
AuthorDate: Sun Nov 4 19:32:44 2018 +0100

[FLINK-10773] Harden resume externalized checkpoint end-to-end test

Ignore the 'Artificial Failure' exceptions and rename
ExceptionThrowingFailureMapper into FailureMapper to avoid false
positive exception matchings.
---
 .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java | 6 --
 .../apache/flink/streaming/tests/DataStreamAllroundTestProgram.java | 6 +++---
 .../{ExceptionThrowingFailureMapper.java => FailureMapper.java} | 4 ++--
 flink-end-to-end-tests/test-scripts/common.sh   | 1 +
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index fb92960..3c8d0ad 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -322,6 +322,8 @@ class DataStreamAllroundTestJobFactory {

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),

SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue( {
 
+   private static final long serialVersionUID = 
-3154419724891779938L;
+
@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
@@ -367,8 +369,8 @@ class DataStreamAllroundTestJobFactory {
return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), 
TEST_SIMULATE_FAILURE.defaultValue());
}
 
-   static MapFunction 
createExceptionThrowingFailureMapper(ParameterTool pt) {
-   return new ExceptionThrowingFailureMapper<>(
+   static MapFunction createFailureMapper(ParameterTool pt) {
+   return new FailureMapper<>(
pt.getLong(
TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),

TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 70fdade..b14e2af 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -41,7 +41,7 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -68,7 +68,7 @@ public class DataStreamAllroundTestProgram {
private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
private static final String TIME_WINDOW_OPER_NAME = 
"TumblingWindowOperator";
private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
-   private static final String FAILURE_MAPPER_NAME = 
"ExceptionThrowingFailureMapper";
+   private static final String FAILURE_MAPPER_NAME = "FailureMapper";
 
public static void 

[flink] branch release-1.7 updated: [FLINK-10772][release] Fix create_binary_release.sh

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 5c28c2c  [FLINK-10772][release] Fix create_binary_release.sh
5c28c2c is described below

commit 5c28c2c1f89535979ffc881eb4a3fd48fa0c6fa9
Author: Till Rohrmann 
AuthorDate: Sun Nov 4 18:38:17 2018 +0100

[FLINK-10772][release] Fix create_binary_release.sh

Remove the unnecessary call to change_scala_version.sh and remove the
-Dmaven.test.skip=true property. The latter is necessary because this
property suppresses the compilation and packaging of test classes. It,
however, does not suppress the resolution of test dependencies which
will then fail to compile because test dependencies have not been built.

This commit also removes the redundant call to build
flink-shaded/hadoop/flink-shaded-hadoop2-uber which is a dependency
of flink-dist anyway.
---
 tools/releasing/create_binary_release.sh | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/tools/releasing/create_binary_release.sh 
b/tools/releasing/create_binary_release.sh
index 8146127..f4d7c57 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -78,8 +78,7 @@ make_binary_release() {
   fi
 
   # enable release profile here (to check for the maven version)
-  tools/change-scala-version.sh ${SCALA_VERSION}
-  $MVN clean package $FLAGS -Prelease -pl 
flink-shaded-hadoop/flink-shaded-hadoop2-uber,flink-dist -am -Dgpg.skip 
-Dcheckstyle.skip=true -DskipTests -Dmaven.test.skip=true
+  $MVN clean package $FLAGS -Prelease -pl flink-dist -am -Dgpg.skip 
-Dcheckstyle.skip=true -DskipTests
 
   cd flink-dist/target/flink-*-bin/
   tar czf "${dir_name}.tgz" flink-*



[flink] branch master updated: [FLINK-10772][release] Fix create_binary_release.sh

2018-11-05 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 742658c  [FLINK-10772][release] Fix create_binary_release.sh
742658c is described below

commit 742658c2cc64255a94d02ea1f1a69427d3c51fb9
Author: Till Rohrmann 
AuthorDate: Sun Nov 4 18:38:17 2018 +0100

[FLINK-10772][release] Fix create_binary_release.sh

Remove the unnecessary call to change_scala_version.sh and remove the
-Dmaven.test.skip=true property. The latter is necessary because this
property suppresses the compilation and packaging of test classes. It,
however, does not suppress the resolution of test dependencies which
will then fail to compile because test dependencies have not been built.

This commit also removes the redundant call to build
flink-shaded/hadoop/flink-shaded-hadoop2-uber which is a dependency
of flink-dist anyway.
---
 tools/releasing/create_binary_release.sh | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/tools/releasing/create_binary_release.sh 
b/tools/releasing/create_binary_release.sh
index 8146127..f4d7c57 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -78,8 +78,7 @@ make_binary_release() {
   fi
 
   # enable release profile here (to check for the maven version)
-  tools/change-scala-version.sh ${SCALA_VERSION}
-  $MVN clean package $FLAGS -Prelease -pl 
flink-shaded-hadoop/flink-shaded-hadoop2-uber,flink-dist -am -Dgpg.skip 
-Dcheckstyle.skip=true -DskipTests -Dmaven.test.skip=true
+  $MVN clean package $FLAGS -Prelease -pl flink-dist -am -Dgpg.skip 
-Dcheckstyle.skip=true -DskipTests
 
   cd flink-dist/target/flink-*-bin/
   tar czf "${dir_name}.tgz" flink-*



[flink] branch release-1.7 updated: [FLINK-10368][e2e] Hardened kerberized yarn e2e test

2018-11-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 08cd6ea  [FLINK-10368][e2e] Hardened kerberized yarn e2e test
08cd6ea is described below

commit 08cd6ea7cd8afa8d2761dde521eb9a7bf21ec5e6
Author: Dawid Wysakowicz 
AuthorDate: Mon Nov 5 11:07:22 2018 +0100

[FLINK-10368][e2e] Hardened kerberized yarn e2e test

* wait for whole  bootstrapping script to execute on master
node before submitting job
* retrying to start hadoop cluster. Failling test in case could not start 
hadoop cluster.
* added check that all containers are up and running before submitting
job
* reduced memory requirements for the kerberized yarn test
---
 .../docker-hadoop-secure-cluster/bootstrap.sh  |   1 +
 .../config/yarn-site.xml   |   9 +-
 .../test-scripts/test_yarn_kerberos_docker.sh  | 139 ++---
 3 files changed, 100 insertions(+), 49 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
index 7b5e50b..5b98b96 100755
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
@@ -124,6 +124,7 @@ elif [ "$1" == "master" ]; then
 hdfs dfs -chown hadoop-user:hadoop-user /user/hadoop-user
 
 kdestroy
+echo "Finished master initialization"
 
 while true; do sleep 1000; done
 elif [ "$1" == "worker" ]; then
diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
index 9b17acc..c7736a6 100644
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
@@ -21,6 +21,11 @@ under the License.
 mapreduce_shuffle
 
 
+   
+   yarn.nodemanager.vmem-pmem-ratio
+   3
+   
+
 
 
@@ -33,12 +38,12 @@ under the License.
 are scheduled on one NM, which wouldn't provoke a previously fixed 
Kerberos keytab bug. -->
 
 yarn.nodemanager.resource.memory-mb
-4100
+2500
 
 
 
 yarn.scheduler.minimum-allocation-mb
-2000
+1000
 
 
 
diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh 
b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
index c9ef15d..5f2dea2 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
@@ -24,7 +24,8 @@ FLINK_TARBALL_DIR=$TEST_DATA_DIR
 FLINK_TARBALL=flink.tar.gz
 FLINK_DIRNAME=$(basename $FLINK_DIR)
 
-MAX_RETRY_SECONDS=800
+MAX_RETRY_SECONDS=120
+CLUSTER_SETUP_RETRIES=3
 
 echo "Flink Tarball directory $FLINK_TARBALL_DIR"
 echo "Flink tarball filename $FLINK_TARBALL"
@@ -33,20 +34,6 @@ echo "End-to-end directory $END_TO_END_DIR"
 docker --version
 docker-compose --version
 
-mkdir -p $FLINK_TARBALL_DIR
-tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
-
-echo "Building Hadoop Docker container"
-until docker build --build-arg HADOOP_VERSION=2.8.4 -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile -t 
flink/docker-hadoop-secure-cluster:latest 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/; do
-# with all the downloading and ubuntu updating a lot of flakiness can 
happen, make sure
-# we don't immediately fail
-echo "Something went wrong while building the Docker image, retrying ..."
-sleep 2
-done
-
-echo "Starting Hadoop cluster"
-docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
-
 # make sure we stop our cluster at the end
 function cluster_shutdown {
   # don't call ourselves again for another signal interruption
@@ -60,12 +47,71 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+function start_hadoop_cluster() {
+echo "Starting Hadoop cluster"
+docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
+
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; 
do
+current_time=$(date +%s)
+time_diff=$((current_time - 

[flink] branch release-1.5 updated: [FLINK-10368][e2e] Hardened kerberized yarn e2e test

2018-11-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
 new 42a84c7  [FLINK-10368][e2e] Hardened kerberized yarn e2e test
42a84c7 is described below

commit 42a84c7ea96e95ec9bfdf5fceb41ad841f44ba80
Author: Dawid Wysakowicz 
AuthorDate: Mon Nov 5 11:07:22 2018 +0100

[FLINK-10368][e2e] Hardened kerberized yarn e2e test

* wait for whole  bootstrapping script to execute on master
node before submitting job
* retrying to start hadoop cluster. Failling test in case could not start 
hadoop cluster.
* added check that all containers are up and running before submitting
job
* reduced memory requirements for the kerberized yarn test
---
 .../docker-hadoop-secure-cluster/bootstrap.sh  |   1 +
 .../config/yarn-site.xml   |   9 +-
 .../test-scripts/test_yarn_kerberos_docker.sh  | 139 ++---
 3 files changed, 100 insertions(+), 49 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
index 7b5e50b..5b98b96 100755
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
@@ -124,6 +124,7 @@ elif [ "$1" == "master" ]; then
 hdfs dfs -chown hadoop-user:hadoop-user /user/hadoop-user
 
 kdestroy
+echo "Finished master initialization"
 
 while true; do sleep 1000; done
 elif [ "$1" == "worker" ]; then
diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
index 9b17acc..c7736a6 100644
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
@@ -21,6 +21,11 @@ under the License.
 mapreduce_shuffle
 
 
+   
+   yarn.nodemanager.vmem-pmem-ratio
+   3
+   
+
 
 
@@ -33,12 +38,12 @@ under the License.
 are scheduled on one NM, which wouldn't provoke a previously fixed 
Kerberos keytab bug. -->
 
 yarn.nodemanager.resource.memory-mb
-4100
+2500
 
 
 
 yarn.scheduler.minimum-allocation-mb
-2000
+1000
 
 
 
diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh 
b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
index c9ef15d..5f2dea2 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
@@ -24,7 +24,8 @@ FLINK_TARBALL_DIR=$TEST_DATA_DIR
 FLINK_TARBALL=flink.tar.gz
 FLINK_DIRNAME=$(basename $FLINK_DIR)
 
-MAX_RETRY_SECONDS=800
+MAX_RETRY_SECONDS=120
+CLUSTER_SETUP_RETRIES=3
 
 echo "Flink Tarball directory $FLINK_TARBALL_DIR"
 echo "Flink tarball filename $FLINK_TARBALL"
@@ -33,20 +34,6 @@ echo "End-to-end directory $END_TO_END_DIR"
 docker --version
 docker-compose --version
 
-mkdir -p $FLINK_TARBALL_DIR
-tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
-
-echo "Building Hadoop Docker container"
-until docker build --build-arg HADOOP_VERSION=2.8.4 -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile -t 
flink/docker-hadoop-secure-cluster:latest 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/; do
-# with all the downloading and ubuntu updating a lot of flakiness can 
happen, make sure
-# we don't immediately fail
-echo "Something went wrong while building the Docker image, retrying ..."
-sleep 2
-done
-
-echo "Starting Hadoop cluster"
-docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
-
 # make sure we stop our cluster at the end
 function cluster_shutdown {
   # don't call ourselves again for another signal interruption
@@ -60,12 +47,71 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+function start_hadoop_cluster() {
+echo "Starting Hadoop cluster"
+docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
+
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; 
do
+current_time=$(date +%s)
+time_diff=$((current_time - 

[flink] branch release-1.6 updated: [FLINK-10368][e2e] Hardened kerberized yarn e2e test

2018-11-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
 new ddcdfa5  [FLINK-10368][e2e] Hardened kerberized yarn e2e test
ddcdfa5 is described below

commit ddcdfa5b8e89a7fb9bfe065bae376ff8571abf85
Author: Dawid Wysakowicz 
AuthorDate: Mon Nov 5 11:07:22 2018 +0100

[FLINK-10368][e2e] Hardened kerberized yarn e2e test

* wait for whole  bootstrapping script to execute on master
node before submitting job
* retrying to start hadoop cluster. Failling test in case could not start 
hadoop cluster.
* added check that all containers are up and running before submitting
job
* reduced memory requirements for the kerberized yarn test
---
 .../docker-hadoop-secure-cluster/bootstrap.sh  |   1 +
 .../config/yarn-site.xml   |   9 +-
 .../test-scripts/test_yarn_kerberos_docker.sh  | 139 ++---
 3 files changed, 100 insertions(+), 49 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
index 7b5e50b..5b98b96 100755
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
@@ -124,6 +124,7 @@ elif [ "$1" == "master" ]; then
 hdfs dfs -chown hadoop-user:hadoop-user /user/hadoop-user
 
 kdestroy
+echo "Finished master initialization"
 
 while true; do sleep 1000; done
 elif [ "$1" == "worker" ]; then
diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
index 9b17acc..c7736a6 100644
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
@@ -21,6 +21,11 @@ under the License.
 mapreduce_shuffle
 
 
+   
+   yarn.nodemanager.vmem-pmem-ratio
+   3
+   
+
 
 
@@ -33,12 +38,12 @@ under the License.
 are scheduled on one NM, which wouldn't provoke a previously fixed 
Kerberos keytab bug. -->
 
 yarn.nodemanager.resource.memory-mb
-4100
+2500
 
 
 
 yarn.scheduler.minimum-allocation-mb
-2000
+1000
 
 
 
diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh 
b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
index c9ef15d..5f2dea2 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
@@ -24,7 +24,8 @@ FLINK_TARBALL_DIR=$TEST_DATA_DIR
 FLINK_TARBALL=flink.tar.gz
 FLINK_DIRNAME=$(basename $FLINK_DIR)
 
-MAX_RETRY_SECONDS=800
+MAX_RETRY_SECONDS=120
+CLUSTER_SETUP_RETRIES=3
 
 echo "Flink Tarball directory $FLINK_TARBALL_DIR"
 echo "Flink tarball filename $FLINK_TARBALL"
@@ -33,20 +34,6 @@ echo "End-to-end directory $END_TO_END_DIR"
 docker --version
 docker-compose --version
 
-mkdir -p $FLINK_TARBALL_DIR
-tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
-
-echo "Building Hadoop Docker container"
-until docker build --build-arg HADOOP_VERSION=2.8.4 -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile -t 
flink/docker-hadoop-secure-cluster:latest 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/; do
-# with all the downloading and ubuntu updating a lot of flakiness can 
happen, make sure
-# we don't immediately fail
-echo "Something went wrong while building the Docker image, retrying ..."
-sleep 2
-done
-
-echo "Starting Hadoop cluster"
-docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
-
 # make sure we stop our cluster at the end
 function cluster_shutdown {
   # don't call ourselves again for another signal interruption
@@ -60,12 +47,71 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+function start_hadoop_cluster() {
+echo "Starting Hadoop cluster"
+docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
+
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; 
do
+current_time=$(date +%s)
+time_diff=$((current_time - 

[flink] branch master updated: [FLINK-10368][e2e] Hardened kerberized yarn e2e test

2018-11-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 413a771  [FLINK-10368][e2e] Hardened kerberized yarn e2e test
413a771 is described below

commit 413a77157caf25dbbfb8b0caaf2c9e12c7374d98
Author: Dawid Wysakowicz 
AuthorDate: Mon Nov 5 11:07:22 2018 +0100

[FLINK-10368][e2e] Hardened kerberized yarn e2e test

* wait for whole  bootstrapping script to execute on master
node before submitting job
* retrying to start hadoop cluster. Failling test in case could not start 
hadoop cluster.
* added check that all containers are up and running before submitting
job
* reduced memory requirements for the kerberized yarn test
---
 .../docker-hadoop-secure-cluster/bootstrap.sh  |   1 +
 .../config/yarn-site.xml   |   9 +-
 .../test-scripts/test_yarn_kerberos_docker.sh  | 139 ++---
 3 files changed, 100 insertions(+), 49 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
index 7b5e50b..5b98b96 100755
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh
@@ -124,6 +124,7 @@ elif [ "$1" == "master" ]; then
 hdfs dfs -chown hadoop-user:hadoop-user /user/hadoop-user
 
 kdestroy
+echo "Finished master initialization"
 
 while true; do sleep 1000; done
 elif [ "$1" == "worker" ]; then
diff --git 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
index 9b17acc..c7736a6 100644
--- 
a/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
+++ 
b/flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/yarn-site.xml
@@ -21,6 +21,11 @@ under the License.
 mapreduce_shuffle
 
 
+   
+   yarn.nodemanager.vmem-pmem-ratio
+   3
+   
+
 
 
@@ -33,12 +38,12 @@ under the License.
 are scheduled on one NM, which wouldn't provoke a previously fixed 
Kerberos keytab bug. -->
 
 yarn.nodemanager.resource.memory-mb
-4100
+2500
 
 
 
 yarn.scheduler.minimum-allocation-mb
-2000
+1000
 
 
 
diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh 
b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
index c9ef15d..5f2dea2 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh
@@ -24,7 +24,8 @@ FLINK_TARBALL_DIR=$TEST_DATA_DIR
 FLINK_TARBALL=flink.tar.gz
 FLINK_DIRNAME=$(basename $FLINK_DIR)
 
-MAX_RETRY_SECONDS=800
+MAX_RETRY_SECONDS=120
+CLUSTER_SETUP_RETRIES=3
 
 echo "Flink Tarball directory $FLINK_TARBALL_DIR"
 echo "Flink tarball filename $FLINK_TARBALL"
@@ -33,20 +34,6 @@ echo "End-to-end directory $END_TO_END_DIR"
 docker --version
 docker-compose --version
 
-mkdir -p $FLINK_TARBALL_DIR
-tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
-
-echo "Building Hadoop Docker container"
-until docker build --build-arg HADOOP_VERSION=2.8.4 -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile -t 
flink/docker-hadoop-secure-cluster:latest 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/; do
-# with all the downloading and ubuntu updating a lot of flakiness can 
happen, make sure
-# we don't immediately fail
-echo "Something went wrong while building the Docker image, retrying ..."
-sleep 2
-done
-
-echo "Starting Hadoop cluster"
-docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
-
 # make sure we stop our cluster at the end
 function cluster_shutdown {
   # don't call ourselves again for another signal interruption
@@ -60,12 +47,71 @@ function cluster_shutdown {
 trap cluster_shutdown INT
 trap cluster_shutdown EXIT
 
-until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do
-# we're retrying this one because we don't know yet if the container is 
ready
-echo "Uploading Flink tarball to docker master failed, retrying ..."
-sleep 5
+function start_hadoop_cluster() {
+echo "Starting Hadoop cluster"
+docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
+
+# wait for kerberos to be set up
+start_time=$(date +%s)
+until docker logs master 2>&1 | grep -q "Finished master initialization"; 
do
+current_time=$(date +%s)
+time_diff=$((current_time - 

[flink] branch release-1.7 updated: [FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Serializer

2018-11-05 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new b9a866e  [FLINK-10490][tests] OperatorSnapshotUtil should use 
SavepointV2Serializer
b9a866e is described below

commit b9a866e3ba534dfdc4fb63a956402b8526390e36
Author: Stefan Richter 
AuthorDate: Tue Oct 23 19:12:58 2018 +0200

[FLINK-10490][tests] OperatorSnapshotUtil should use SavepointV2Serializer

Please not that state written with OperatorSnapshotUtil before this commit 
was written in the V1 format.
Now we are using the current V2.

This closes #6910.
---
 .../savepoint/SavepointV2Serializer.java   | 23 +++---
 .../flink/streaming/util/OperatorSnapshotUtil.java | 22 ++---
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index faee588..fa84077 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -67,7 +69,9 @@ import java.util.UUID;
  *  +--+-+-+--+---+
  * 
  */
-class SavepointV2Serializer implements SavepointSerializer {
+@Internal
+@VisibleForTesting
+public class SavepointV2Serializer implements SavepointSerializer 
{
 
/** Random magic number for consistency checks */
private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
@@ -320,7 +324,8 @@ class SavepointV2Serializer implements 
SavepointSerializer {
keyedStateStream);
}
 
-   private static void serializeKeyedStateHandle(
+   @VisibleForTesting
+   public static void serializeKeyedStateHandle(
KeyedStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
if (stateHandle == null) {
@@ -380,7 +385,8 @@ class SavepointV2Serializer implements 
SavepointSerializer {
return result;
}
 
-   private static KeyedStateHandle 
deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+   @VisibleForTesting
+   public static KeyedStateHandle 
deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
 
@@ -433,7 +439,8 @@ class SavepointV2Serializer implements 
SavepointSerializer {
}
}
 
-   private static void serializeOperatorStateHandle(
+   @VisibleForTesting
+   public static void serializeOperatorStateHandle(
OperatorStateHandle stateHandle, DataOutputStream dos) throws 
IOException {
 
if (stateHandle != null) {
@@ -461,7 +468,8 @@ class SavepointV2Serializer implements 
SavepointSerializer {
}
}
 
-   private static OperatorStateHandle deserializeOperatorStateHandle(
+   @VisibleForTesting
+   public static OperatorStateHandle deserializeOperatorStateHandle(
DataInputStream dis) throws IOException {
 
final int type = dis.readByte();
@@ -492,7 +500,8 @@ class SavepointV2Serializer implements 
SavepointSerializer {
}
}
 
-   private static void serializeStreamStateHandle(
+   @VisibleForTesting
+   public static void serializeStreamStateHandle(
StreamStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
 
if (stateHandle == null) {
@@ -518,7 +527,7 @@ class SavepointV2Serializer implements 
SavepointSerializer {
dos.flush();
}
 
-   private static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+   public static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
final int type = dis.read();
if (NULL_HANDLE == type) {
return null;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 1b5113d..53627d5 100644
---