[flink] branch master updated: [FLINK-13012][hive] Handle default partition name of Hive table

2019-07-29 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d81374  [FLINK-13012][hive] Handle default partition name of Hive 
table
1d81374 is described below

commit 1d81374ce344e7bde410792c8a2efd6f4e707b9f
Author: Rui Li 
AuthorDate: Thu Jul 11 21:08:22 2019 +0800

[FLINK-13012][hive] Handle default partition name of Hive table

This closes #9088
---
 .../connectors/hive/HiveTableOutputFormat.java | 15 +++-
 .../batch/connectors/hive/HiveTableSource.java | 11 ++-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 88 ++
 3 files changed, 109 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index acb1bf9..ade5830 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -141,6 +141,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase imp
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
 
+   // used when partition values is null or empty
+   private transient String defaultPartitionName;
+
public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, 
CatalogTable table, HiveTablePartition hiveTablePartition,
Properties 
tableProperties, boolean overwrite) {
super(jobConf.getCredentials());
@@ -298,6 +301,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase imp
rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(

Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionColumns.size()),
objectInspectors);
+   defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+   
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
}
}
 
@@ -310,10 +315,12 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase imp
// only need to check the dynamic partitions
final int numStaticPart = 
hiveTablePartition.getPartitionSpec().size();
for (int i = dynamicPartitionOffset; i < 
record.getArity(); i++) {
-   // TODO: seems Hive also just calls 
toString(), need further investigation to confirm
-   // TODO: validate partition value
-   String partVal = 
record.getField(i).toString();
-   dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partVal);
+   Object field = record.getField(i);
+   String partitionValue = field != null ? 
field.toString() : null;
+   if (partitionValue == null || 
partitionValue.isEmpty()) {
+   partitionValue = 
defaultPartitionName;
+   }
+   dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partitionValue);
}
String partName = 
Warehouse.makePartPath(dynPartSpec);
partitionWriter = 
partitionToWriter.get(partName);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 706442d..e0734f4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -157,6 +157,8 @@ public class HiveTableSource extends 
InputFormatTableSource implements Part
String tableName = tablePath.getObjectName();
List partitionColNames = 
catalogTable.getPartitionKeys();
if (partitionColNames != null && 
partitionColNames.size()

[flink] branch release-1.9 updated: [FLINK-13012][hive] Handle default partition name of Hive table

2019-07-29 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 25e6143  [FLINK-13012][hive] Handle default partition name of Hive 
table
25e6143 is described below

commit 25e6143a3c425c47851cbd69a41d68fe53f62f91
Author: Rui Li 
AuthorDate: Thu Jul 11 21:08:22 2019 +0800

[FLINK-13012][hive] Handle default partition name of Hive table

This closes #9088
---
 .../connectors/hive/HiveTableOutputFormat.java | 15 +++-
 .../batch/connectors/hive/HiveTableSource.java | 11 ++-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 88 ++
 3 files changed, 109 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index acb1bf9..ade5830 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -141,6 +141,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase imp
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
 
+   // used when partition values is null or empty
+   private transient String defaultPartitionName;
+
public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, 
CatalogTable table, HiveTablePartition hiveTablePartition,
Properties 
tableProperties, boolean overwrite) {
super(jobConf.getCredentials());
@@ -298,6 +301,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase imp
rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(

Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionColumns.size()),
objectInspectors);
+   defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+   
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
}
}
 
@@ -310,10 +315,12 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase imp
// only need to check the dynamic partitions
final int numStaticPart = 
hiveTablePartition.getPartitionSpec().size();
for (int i = dynamicPartitionOffset; i < 
record.getArity(); i++) {
-   // TODO: seems Hive also just calls 
toString(), need further investigation to confirm
-   // TODO: validate partition value
-   String partVal = 
record.getField(i).toString();
-   dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partVal);
+   Object field = record.getField(i);
+   String partitionValue = field != null ? 
field.toString() : null;
+   if (partitionValue == null || 
partitionValue.isEmpty()) {
+   partitionValue = 
defaultPartitionName;
+   }
+   dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partitionValue);
}
String partName = 
Warehouse.makePartPath(dynPartSpec);
partitionWriter = 
partitionToWriter.get(partName);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 706442d..e0734f4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -157,6 +157,8 @@ public class HiveTableSource extends 
InputFormatTableSource implements Part
String tableName = tablePath.getObjectName();
List partitionColNames = 
catalogTable.getPartitionKeys();
if (partitionColNames != null && 
partitionColNa

[flink] branch master updated: [FLINK-13451][tests] Remove use of Unsafe.defineClass() from CommonTestUtils

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c5b133e  [FLINK-13451][tests] Remove use of Unsafe.defineClass() from 
CommonTestUtils
c5b133e is described below

commit c5b133e6b13ea468763730e45f872b2370c2a206
Author: Stephan Ewen 
AuthorDate: Sun Jul 28 17:16:17 2019 +0200

[FLINK-13451][tests] Remove use of Unsafe.defineClass() from CommonTestUtils

The method Unsafe.defineClass() is removed in Java 11. To support Java 11, 
we rework the method
"CommonTestUtils.createClassNotInClassPath()" to use a different mechanism.

This commit now writes the class byte code out to a temporary file and 
create a new URLClassLoader that
loads the class from that file.  That solution is not a complete drop-in 
replacement, because it cannot
add the class to an existing class loader, but can only create a new pair 
of (classloader & new-class-in-that-classloader).
Because of that, the commit also adjusts the existing tests to work with 
that new mechanism.

This closes #9251
---
 .../kryo/KryoSerializerClassLoadingTest.java   |  23 +--
 .../runtime/kryo/KryoSerializerSnapshotTest.java   |  20 +--
 .../CheckpointSettingsSerializableTest.java|   7 +-
 .../runtime/executiongraph/ErrorInfoTest.java  |   6 +-
 .../flink/runtime/state/JavaSerializerTest.java|  22 +--
 .../flink/core/testutils/CommonTestUtils.java  | 177 +++--
 .../flink/core/testutils/CommonTestUtilsTest.java  |  53 ++
 7 files changed, 203 insertions(+), 105 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
index 9823e11..0d5e908 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerClassLoadingTest.java
@@ -23,14 +23,11 @@ import 
org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.core.testutils.CommonTestUtils;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
-
 import static org.junit.Assert.fail;
 
 /**
@@ -39,13 +36,9 @@ import static org.junit.Assert.fail;
  */
 public class KryoSerializerClassLoadingTest extends SerializerTestBase 
{
 
-   /** Class loader for the object that is not in the test class path */
-   private static final ClassLoader CLASS_LOADER =
-   new URLClassLoader(new URL[0], 
KryoSerializerClassLoadingTest.class.getClassLoader());
-
-   /** An object that is not in the test class path */
-   private static final Serializable OBJECT_OUT_OF_CLASSPATH =
-   
CommonTestUtils.createObjectForClassNotInClassPath(CLASS_LOADER);
+   /** Class loader and object that is not in the test class path. */
+   private static final CommonTestUtils.ObjectAndClassLoader 
OUTSIDE_CLASS_LOADING =
+   CommonTestUtils.createObjectFromNewClassLoader();
 
// 

 
@@ -54,7 +47,7 @@ public class KryoSerializerClassLoadingTest extends 
SerializerTestBase {
@Before
public void setupClassLoader() {
originalClassLoader = 
Thread.currentThread().getContextClassLoader();
-   Thread.currentThread().setContextClassLoader(CLASS_LOADER);
+   
Thread.currentThread().setContextClassLoader(OUTSIDE_CLASS_LOADING.getClassLoader());
}
 
@After
@@ -67,7 +60,7 @@ public class KryoSerializerClassLoadingTest extends 
SerializerTestBase {
@Test
public void guardTestAssumptions() {
try {
-   
Class.forName(OBJECT_OUT_OF_CLASSPATH.getClass().getName());
+   
Class.forName(OUTSIDE_CLASS_LOADING.getObject().getClass().getName());
fail("This test's assumptions are broken");
}
catch (ClassNotFoundException ignored) {
@@ -98,11 +91,11 @@ public class KryoSerializerClassLoadingTest extends 
SerializerTestBase {
new Integer(7),
 
// an object whose class is not on the classpath
-   OBJECT_OUT_OF_CLASSPATH,
+   OUTSIDE_CLASS_LOADING.getObject(),
 
// an object whose class IS on the classpa

[flink] branch master updated: [FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps watermark strategy

2019-07-29 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b670b11  [FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps 
watermark strategy
b670b11 is described below

commit b670b11f303657aa8175ec933ee29b377cb9e087
Author: Timo Walther 
AuthorDate: Fri Jul 26 15:14:42 2019 +0200

[FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps watermark 
strategy

This closes #9241.
---
 .../flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
index 725f534..1ddbc31 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
@@ -36,13 +36,14 @@ public final class BoundedOutOfOrderTimestamps extends 
PeriodicWatermarkAssigner
private static final long serialVersionUID = 1L;
 
private final long delay;
-   private long maxTimestamp = Long.MIN_VALUE + 1;
+   private long maxTimestamp;
 
/**
 * @param delay The delay by which watermarks are behind the maximum 
observed timestamp.
 */
public BoundedOutOfOrderTimestamps(long delay) {
this.delay = delay;
+   maxTimestamp = Long.MIN_VALUE + delay;
}
 
@Override



[flink] branch release-1.9 updated: [FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps watermark strategy

2019-07-29 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 0224d9b  [FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps 
watermark strategy
0224d9b is described below

commit 0224d9bc0a773633943282f5268770a8063a87a2
Author: Timo Walther 
AuthorDate: Fri Jul 26 15:14:42 2019 +0200

[FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps watermark 
strategy

This closes #9241.
---
 .../flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
index 725f534..1ddbc31 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
@@ -36,13 +36,14 @@ public final class BoundedOutOfOrderTimestamps extends 
PeriodicWatermarkAssigner
private static final long serialVersionUID = 1L;
 
private final long delay;
-   private long maxTimestamp = Long.MIN_VALUE + 1;
+   private long maxTimestamp;
 
/**
 * @param delay The delay by which watermarks are behind the maximum 
observed timestamp.
 */
public BoundedOutOfOrderTimestamps(long delay) {
this.delay = delay;
+   maxTimestamp = Long.MIN_VALUE + delay;
}
 
@Override



[flink] branch master updated (b670b11 -> 12118b9)

2019-07-29 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b670b11  [FLINK-13429][table-common] Fix BoundedOutOfOrderTimestamps 
watermark strategy
 add 12118b9  [FLINK-13387][WebUI] Fix log download for old UI

No new revisions were added by this update.

Summary of changes:
 .../old-version/partials/taskmanager/taskmanager.log.html | 4 ++--
 .../old-version/partials/taskmanager/taskmanager.stdout.html  | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)



[flink] branch release-1.9 updated: [FLINK-13387][WebUI] Fix log download for old UI

2019-07-29 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 5077f18  [FLINK-13387][WebUI] Fix log download for old UI
5077f18 is described below

commit 5077f186c5fc0644f06145bb5fa7796f54be8900
Author: Chesnay Schepler 
AuthorDate: Fri Jul 26 13:41:34 2019 +0200

[FLINK-13387][WebUI] Fix log download for old UI
---
 .../old-version/partials/taskmanager/taskmanager.log.html | 4 ++--
 .../old-version/partials/taskmanager/taskmanager.stdout.html  | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.log.html
 
b/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.log.html
index 54ed86a..112849d 100644
--- 
a/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.log.html
+++ 
b/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.log.html
@@ -24,7 +24,7 @@ limitations under the License.
 
   Task Manager Logs
   
-  
+  
 
   
 
@@ -36,4 +36,4 @@ limitations under the License.
   
 
   
-
\ No newline at end of file
+
diff --git 
a/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.stdout.html
 
b/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.stdout.html
index 7acef7a..c8d8092 100644
--- 
a/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.stdout.html
+++ 
b/flink-runtime-web/web-dashboard/old-version/partials/taskmanager/taskmanager.stdout.html
@@ -24,7 +24,7 @@ limitations under the License.
 
   Task Manager Output
   
-  
+  
 
   
 
@@ -36,4 +36,4 @@ limitations under the License.
   
 
   
-
\ No newline at end of file
+



[flink] 01/03: [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 526236a4d537e78dbac4c575611ef52b602923d9
Author: Zhijiang 
AuthorDate: Tue Jul 16 23:00:09 2019 +0800

[FLINK-13245][network] Fix the bug of file resource leak while canceling 
partition request

On producer side the netty handler receives the CancelPartitionRequest for 
releasing the SubpartitionView resource.
In previous implementation we try to find the corresponding view via 
available queue in PartitionRequestQueue. But
in reality the view is not always available to stay in this queue, then the 
view would never be released.

Furthermore the release of ResultPartition/ResultSubpartitions is based on 
the reference counter in ReleaseOnConsumptionResultPartition,
but while handling the CancelPartitionRequest in PartitionRequestQueue, the 
ReleaseOnConsumptionResultPartition is never
notified of consumed subpartition. That means the reference counter would 
never decrease to 0 to trigger partition release,
which would bring file resource leak in the case of 
BoundedBlockingSubpartition.

In order to fix above two issues, the corresponding view is released via 
all reader queue instead, and then it would call
ReleaseOnConsumptionResultPartition#onConsumedSubpartition meanwhile to 
solve this bug.
---
 .../io/network/netty/PartitionRequestQueue.java| 33 
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 4 files changed, 118 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index f82a42f..b492ea6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -138,9 +138,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
}
 
for (NetworkSequenceViewReader reader : allReaders.values()) {
-   reader.notifySubpartitionConsumed();
-   reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
+   releaseViewReader(reader);
}
allReaders.clear();
}
@@ -181,19 +179,14 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
return;
}
 
-   // Cancel the request for the input channel
-   int size = availableReaders.size();
-   for (int i = 0; i < size; i++) {
-   NetworkSequenceViewReader reader = 
pollAvailableReader();
-   if (reader.getReceiverId().equals(toCancel)) {
-   reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
-   } else {
-   registerAvailableReader(reader);
-   }
-   }
+   // remove reader from queue of available readers
+   availableReaders.removeIf(reader -> 
reader.getReceiverId().equals(toCancel));
 
-   allReaders.remove(toCancel);
+   // remove reader from queue of all readers and release 
its resource
+   final NetworkSequenceViewReader toRelease = 
allReaders.remove(toCancel);
+   if (toRelease != null) {
+   releaseViewReader(toRelease);
+   }
} else {
ctx.fireUserEventTriggered(msg);
}
@@ -308,14 +301,20 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
private void releaseAllResources() throws IOException {
// note: this is only ever executed by one thread: the Netty IO 
thread!
for (NetworkSequenceViewReader reader : allReaders.values()) {
-   reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
+   releaseViewReader(reader);
}
 
availableReaders.clear();
allReaders.clear();
}
 
+   private void releaseViewReader(NetworkSequenceViewReader reader) throws 
IOException {
+   reader.notifySubpartitionConsumed();
+   reader.setRegisteredAsAvailable(f

[flink] branch release-1.9 updated (5077f18 -> 7116ab7)

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5077f18  [FLINK-13387][WebUI] Fix log download for old UI
 new 526236a  [FLINK-13245][network] Fix the bug of file resource leak 
while canceling partition request
 new f738c4e  [FLINK-13245][network] Make subpartition consumption 
notification independant
 new 7116ab7  [FLINK-13245][network] Remove redundant bookkeeping for 
already canceled input channel IDs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/netty/PartitionRequestQueue.java| 43 +++---
 .../ReleaseOnConsumptionResultPartition.java   | 42 +++---
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 +-
 6 files changed, 182 insertions(+), 48 deletions(-)



[flink] 03/03: [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7116ab71edc183d34d128453e06a3efc15ad8905
Author: Zhijiang 
AuthorDate: Fri Jul 26 11:50:55 2019 +0200

[FLINK-13245][network] Remove redundant bookkeeping for already canceled 
input channel IDs
---
 .../runtime/io/network/netty/PartitionRequestQueue.java  | 16 
 1 file changed, 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index b492ea6..4a845d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -62,8 +60,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
/** All the readers created for the consumers' partition requests. */
private final ConcurrentMap 
allReaders = new ConcurrentHashMap<>();
 
-   private final Set released = Sets.newHashSet();
-
private boolean fatalError;
 
private ChannelHandlerContext ctx;
@@ -175,9 +171,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
} else if (msg.getClass() == InputChannelID.class) {
// Release partition view that get a cancel request.
InputChannelID toCancel = (InputChannelID) msg;
-   if (released.contains(toCancel)) {
-   return;
-   }
 
// remove reader from queue of available readers
availableReaders.removeIf(reader -> 
reader.getReceiverId().equals(toCancel));
@@ -222,7 +215,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
if (!reader.isReleased()) {
continue;
}
-   markAsReleased(reader.getReceiverId());
 
Throwable cause = 
reader.getFailureCause();
if (cause != null) {
@@ -312,14 +304,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
reader.notifySubpartitionConsumed();
reader.setRegisteredAsAvailable(false);
reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
-   }
-
-   /**
-* Marks a receiver as released.
-*/
-   private void markAsReleased(InputChannelID receiverId) {
-   released.add(receiverId);
}
 
// This listener is called after an element of the current 
nonEmptyReader has been



[flink] 02/03: [FLINK-13245][network] Make subpartition consumption notification independant

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f738c4edd5858891236a41fc1e453b06cf815089
Author: Zhijiang 
AuthorDate: Fri Jul 26 11:25:40 2019 +0200

[FLINK-13245][network] Make subpartition consumption notification 
independant
---
 .../ReleaseOnConsumptionResultPartition.java   | 42 +++---
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 ++---
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
index 19ec681..766f500 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -31,11 +30,18 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public class ReleaseOnConsumptionResultPartition extends ResultPartition {
 
+   private static final Object lock = new Object();
+
+   /**
+* A flag for each subpartition indicating whether it was already 
consumed or not.
+*/
+   private final boolean[] consumedSubpartitions;
+
/**
 * The total number of references to subpartitions of this result. The 
result partition can be
 * safely released, iff the reference count is zero.
 */
-   private final AtomicInteger pendingReferences = new AtomicInteger();
+   private int numUnconsumedSubpartitions;
 
ReleaseOnConsumptionResultPartition(
String owningTaskName,
@@ -47,12 +53,13 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
FunctionWithException bufferPoolFactory) {
super(owningTaskName, partitionId, partitionType, 
subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
 
-   pendingReferences.set(subpartitions.length);
+   this.consumedSubpartitions = new boolean[subpartitions.length];
+   this.numUnconsumedSubpartitions = subpartitions.length;
}
 
@Override
public ResultSubpartitionView createSubpartitionView(int index, 
BufferAvailabilityListener availabilityListener) throws IOException {
-   checkState(pendingReferences.get() > 0, "Partition not 
pinned.");
+   checkState(numUnconsumedSubpartitions > 0, "Partition not 
pinned.");
 
return super.createSubpartitionView(index, 
availabilityListener);
}
@@ -63,22 +70,33 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
return;
}
 
-   int refCnt = pendingReferences.decrementAndGet();
+   final int remainingUnconsumed;
 
-   if (refCnt == 0) {
-   partitionManager.onConsumedPartition(this);
-   } else if (refCnt < 0) {
-   throw new IllegalStateException("All references 
released.");
+   // we synchronize only the bookkeeping section, to avoid 
holding the lock during any
+   // calls into other components
+   synchronized (lock) {
+   if (consumedSubpartitions[subpartitionIndex]) {
+   // repeated call - ignore
+   return;
+   }
+
+   consumedSubpartitions[subpartitionIndex] = true;
+   remainingUnconsumed = (--numUnconsumedSubpartitions);
}
 
-   LOG.debug("{}: Received release notification for subpartition 
{}.",
-   this, subpartitionIndex);
+   LOG.debug("{}: Received consumed notification for subpartition 
{}.", this, subpartitionIndex);
+
+   if (remainingUnconsumed == 0) {
+   partitionManager.onConsumedPartition(this);
+   } else if (remainingUnconsumed < 0) {
+   throw new IllegalStateException("Received consume 
notification even though all subpartitions are already consumed.");
+   }
}
 
@Override
public String toString() {
return "ReleaseOnConsumptionResultPartition " + 
partitionId.toString() + " [" + partitionType + ", "

[flink] 02/03: [FLINK-13245][network] Make subpartition consumption notification independant

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acd15d2527e7c8c137cb8719e15da3edc0a2c994
Author: Zhijiang 
AuthorDate: Fri Jul 26 11:25:40 2019 +0200

[FLINK-13245][network] Make subpartition consumption notification 
independant
---
 .../ReleaseOnConsumptionResultPartition.java   | 42 +++---
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 ++---
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
index 19ec681..766f500 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -31,11 +30,18 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public class ReleaseOnConsumptionResultPartition extends ResultPartition {
 
+   private static final Object lock = new Object();
+
+   /**
+* A flag for each subpartition indicating whether it was already 
consumed or not.
+*/
+   private final boolean[] consumedSubpartitions;
+
/**
 * The total number of references to subpartitions of this result. The 
result partition can be
 * safely released, iff the reference count is zero.
 */
-   private final AtomicInteger pendingReferences = new AtomicInteger();
+   private int numUnconsumedSubpartitions;
 
ReleaseOnConsumptionResultPartition(
String owningTaskName,
@@ -47,12 +53,13 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
FunctionWithException bufferPoolFactory) {
super(owningTaskName, partitionId, partitionType, 
subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
 
-   pendingReferences.set(subpartitions.length);
+   this.consumedSubpartitions = new boolean[subpartitions.length];
+   this.numUnconsumedSubpartitions = subpartitions.length;
}
 
@Override
public ResultSubpartitionView createSubpartitionView(int index, 
BufferAvailabilityListener availabilityListener) throws IOException {
-   checkState(pendingReferences.get() > 0, "Partition not 
pinned.");
+   checkState(numUnconsumedSubpartitions > 0, "Partition not 
pinned.");
 
return super.createSubpartitionView(index, 
availabilityListener);
}
@@ -63,22 +70,33 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
return;
}
 
-   int refCnt = pendingReferences.decrementAndGet();
+   final int remainingUnconsumed;
 
-   if (refCnt == 0) {
-   partitionManager.onConsumedPartition(this);
-   } else if (refCnt < 0) {
-   throw new IllegalStateException("All references 
released.");
+   // we synchronize only the bookkeeping section, to avoid 
holding the lock during any
+   // calls into other components
+   synchronized (lock) {
+   if (consumedSubpartitions[subpartitionIndex]) {
+   // repeated call - ignore
+   return;
+   }
+
+   consumedSubpartitions[subpartitionIndex] = true;
+   remainingUnconsumed = (--numUnconsumedSubpartitions);
}
 
-   LOG.debug("{}: Received release notification for subpartition 
{}.",
-   this, subpartitionIndex);
+   LOG.debug("{}: Received consumed notification for subpartition 
{}.", this, subpartitionIndex);
+
+   if (remainingUnconsumed == 0) {
+   partitionManager.onConsumedPartition(this);
+   } else if (remainingUnconsumed < 0) {
+   throw new IllegalStateException("Received consume 
notification even though all subpartitions are already consumed.");
+   }
}
 
@Override
public String toString() {
return "ReleaseOnConsumptionResultPartition " + 
partitionId.toString() + " [" + partitionType + ", "
+ sub

[flink] 01/03: [FLINK-13245][network] Fix the bug of file resource leak while canceling partition request

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 653ba1a1bccb7457c7206c385a49d911b4950483
Author: Zhijiang 
AuthorDate: Tue Jul 16 23:00:09 2019 +0800

[FLINK-13245][network] Fix the bug of file resource leak while canceling 
partition request

On producer side the netty handler receives the CancelPartitionRequest for 
releasing the SubpartitionView resource.
In previous implementation we try to find the corresponding view via 
available queue in PartitionRequestQueue. But
in reality the view is not always available to stay in this queue, then the 
view would never be released.

Furthermore the release of ResultPartition/ResultSubpartitions is based on 
the reference counter in ReleaseOnConsumptionResultPartition,
but while handling the CancelPartitionRequest in PartitionRequestQueue, the 
ReleaseOnConsumptionResultPartition is never
notified of consumed subpartition. That means the reference counter would 
never decrease to 0 to trigger partition release,
which would bring file resource leak in the case of 
BoundedBlockingSubpartition.

In order to fix above two issues, the corresponding view is released via 
all reader queue instead, and then it would call
ReleaseOnConsumptionResultPartition#onConsumedSubpartition meanwhile to 
solve this bug.
---
 .../io/network/netty/PartitionRequestQueue.java| 33 
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 4 files changed, 118 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index f82a42f..b492ea6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -138,9 +138,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
}
 
for (NetworkSequenceViewReader reader : allReaders.values()) {
-   reader.notifySubpartitionConsumed();
-   reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
+   releaseViewReader(reader);
}
allReaders.clear();
}
@@ -181,19 +179,14 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
return;
}
 
-   // Cancel the request for the input channel
-   int size = availableReaders.size();
-   for (int i = 0; i < size; i++) {
-   NetworkSequenceViewReader reader = 
pollAvailableReader();
-   if (reader.getReceiverId().equals(toCancel)) {
-   reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
-   } else {
-   registerAvailableReader(reader);
-   }
-   }
+   // remove reader from queue of available readers
+   availableReaders.removeIf(reader -> 
reader.getReceiverId().equals(toCancel));
 
-   allReaders.remove(toCancel);
+   // remove reader from queue of all readers and release 
its resource
+   final NetworkSequenceViewReader toRelease = 
allReaders.remove(toCancel);
+   if (toRelease != null) {
+   releaseViewReader(toRelease);
+   }
} else {
ctx.fireUserEventTriggered(msg);
}
@@ -308,14 +301,20 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
private void releaseAllResources() throws IOException {
// note: this is only ever executed by one thread: the Netty IO 
thread!
for (NetworkSequenceViewReader reader : allReaders.values()) {
-   reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
+   releaseViewReader(reader);
}
 
availableReaders.clear();
allReaders.clear();
}
 
+   private void releaseViewReader(NetworkSequenceViewReader reader) throws 
IOException {
+   reader.notifySubpartitionConsumed();
+   reader.setRegisteredAsAvailable(false)

[flink] 03/03: [FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71a53d49d1c6ce0e5f840e1b528cb75323dc2665
Author: Zhijiang 
AuthorDate: Fri Jul 26 11:50:55 2019 +0200

[FLINK-13245][network] Remove redundant bookkeeping for already canceled 
input channel IDs
---
 .../runtime/io/network/netty/PartitionRequestQueue.java  | 16 
 1 file changed, 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index b492ea6..4a845d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -62,8 +60,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
/** All the readers created for the consumers' partition requests. */
private final ConcurrentMap 
allReaders = new ConcurrentHashMap<>();
 
-   private final Set released = Sets.newHashSet();
-
private boolean fatalError;
 
private ChannelHandlerContext ctx;
@@ -175,9 +171,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
} else if (msg.getClass() == InputChannelID.class) {
// Release partition view that get a cancel request.
InputChannelID toCancel = (InputChannelID) msg;
-   if (released.contains(toCancel)) {
-   return;
-   }
 
// remove reader from queue of available readers
availableReaders.removeIf(reader -> 
reader.getReceiverId().equals(toCancel));
@@ -222,7 +215,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
if (!reader.isReleased()) {
continue;
}
-   markAsReleased(reader.getReceiverId());
 
Throwable cause = 
reader.getFailureCause();
if (cause != null) {
@@ -312,14 +304,6 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
reader.notifySubpartitionConsumed();
reader.setRegisteredAsAvailable(false);
reader.releaseAllResources();
-   markAsReleased(reader.getReceiverId());
-   }
-
-   /**
-* Marks a receiver as released.
-*/
-   private void markAsReleased(InputChannelID receiverId) {
-   released.add(receiverId);
}
 
// This listener is called after an element of the current 
nonEmptyReader has been



[flink] branch master updated (12118b9 -> 71a53d4)

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 12118b9  [FLINK-13387][WebUI] Fix log download for old UI
 new 653ba1a  [FLINK-13245][network] Fix the bug of file resource leak 
while canceling partition request
 new acd15d2  [FLINK-13245][network] Make subpartition consumption 
notification independant
 new 71a53d4  [FLINK-13245][network] Remove redundant bookkeeping for 
already canceled input channel IDs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/netty/PartitionRequestQueue.java| 43 +++---
 .../ReleaseOnConsumptionResultPartition.java   | 42 +++---
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 +-
 6 files changed, 182 insertions(+), 48 deletions(-)



[flink] branch master updated: [hotfix] Fix typo in api_concepts.md

2019-07-29 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new db6529a  [hotfix] Fix typo in api_concepts.md
db6529a is described below

commit db6529aebc74b0a0d3422ecc0fc86cb1c135707d
Author: WangHengwei <35023...@qq.com>
AuthorDate: Mon Jul 29 19:43:34 2019 +0800

[hotfix] Fix typo in api_concepts.md
---
 docs/dev/api_concepts.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index b70ab08..1e8fd18 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -758,7 +758,7 @@ Restrictions apply to classes containing fields that cannot 
be serialized, like
 resources. Classes that follow the Java Beans conventions work well in general.
 
 All classes that are not identified as POJO types (see POJO requirements 
above) are handled by Flink as general class types.
-Flink treats these data types as black boxes and is not able to access their 
content (i.e., for efficient sorting). General types are de/serialized using 
the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
+Flink treats these data types as black boxes and is not able to access their 
content (e.g., for efficient sorting). General types are de/serialized using 
the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
 
  Values
 



[flink] branch release-1.9 updated: [hotfix] Fix typo in api_concepts.md

2019-07-29 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 41675a9  [hotfix] Fix typo in api_concepts.md
41675a9 is described below

commit 41675a9ffda09056d9d2490681475dd044532d40
Author: WangHengwei <35023...@qq.com>
AuthorDate: Mon Jul 29 19:43:34 2019 +0800

[hotfix] Fix typo in api_concepts.md
---
 docs/dev/api_concepts.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index b70ab08..1e8fd18 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -758,7 +758,7 @@ Restrictions apply to classes containing fields that cannot 
be serialized, like
 resources. Classes that follow the Java Beans conventions work well in general.
 
 All classes that are not identified as POJO types (see POJO requirements 
above) are handled by Flink as general class types.
-Flink treats these data types as black boxes and is not able to access their 
content (i.e., for efficient sorting). General types are de/serialized using 
the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
+Flink treats these data types as black boxes and is not able to access their 
content (e.g., for efficient sorting). General types are de/serialized using 
the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
 
  Values
 



[flink] branch master updated: [FLINK-13458][table] ThreadLocalCache clashes for Blink planner

2019-07-29 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 14afeea  [FLINK-13458][table] ThreadLocalCache clashes for Blink 
planner
14afeea is described below

commit 14afeea06dcd9cb90dfaf471156b34a14380038b
Author: Timo Walther 
AuthorDate: Mon Jul 29 09:32:48 2019 +0200

[FLINK-13458][table] ThreadLocalCache clashes for Blink planner

This closes #9257.
---
 .../flink/table/utils}/ThreadLocalCache.java   | 10 +++--
 .../runtime/functions/DateTimeFunctions.scala  |  1 +
 .../table/runtime/functions/ThreadLocalCache.scala | 49 --
 .../table/runtime/functions/SqlDateTimeUtils.java  |  1 +
 .../table/runtime/functions/SqlFunctionUtils.java  |  1 +
 5 files changed, 10 insertions(+), 52 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
similarity index 90%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
index 39de4df..ca47ab0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
@@ -16,15 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.functions;
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
- * Provides a ThreadLocal cache with a maximum cache size per thread.
- * Values must not be null.
+ * Provides a thread local cache with a maximum cache size per thread.
+ *
+ * Note: Values must not be null.
  */
+@Internal
 public abstract class ThreadLocalCache {
 
private static final int DEFAULT_CACHE_SIZE = 64;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
index d69a5c9..0bde983 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.runtime.functions
 
+import org.apache.flink.table.utils.ThreadLocalCache
 import org.joda.time.format.DateTimeFormatter
 import org.joda.time.format.DateTimeFormatterBuilder
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
deleted file mode 100644
index b3a8d7a..000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.functions
-
-import java.util.{LinkedHashMap => JLinkedHashMap}
-import java.util.{Map => JMap}
-
-/**
-  * Provides a ThreadLocal cache with a maximum cache size per thread.
-  * Values must not be null.
-  */
-abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) {
-  private val cache = new ThreadLocal[BoundedMap[K, V]]
-
-  protected def getNewInstance(key: K): V
-
-  def get(key: K): V = {
-var m = cache.get
-if (m == null) {
-  m = new BoundedMap(maxSizePerThread)
-  cache.set(m)
-}
-var v = m.get(key)
-if (v == null) {
-  v = getNewInstance(key)
-  m.put(key, v)
-}
-v
-  }
-}
-
-private class BoundedMap[K, V](val 

[flink] branch release-1.9 updated: [FLINK-13458][table] ThreadLocalCache clashes for Blink planner

2019-07-29 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 36a7fde  [FLINK-13458][table] ThreadLocalCache clashes for Blink 
planner
36a7fde is described below

commit 36a7fde092df89e3be7660075df3dcdfafc762b6
Author: Timo Walther 
AuthorDate: Mon Jul 29 09:32:48 2019 +0200

[FLINK-13458][table] ThreadLocalCache clashes for Blink planner

This closes #9257.
---
 .../flink/table/utils}/ThreadLocalCache.java   | 10 +++--
 .../runtime/functions/DateTimeFunctions.scala  |  1 +
 .../table/runtime/functions/ThreadLocalCache.scala | 49 --
 .../table/runtime/functions/SqlDateTimeUtils.java  |  1 +
 .../table/runtime/functions/SqlFunctionUtils.java  |  1 +
 5 files changed, 10 insertions(+), 52 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
similarity index 90%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
index 39de4df..ca47ab0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/ThreadLocalCache.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/ThreadLocalCache.java
@@ -16,15 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.functions;
+package org.apache.flink.table.utils;
+
+import org.apache.flink.annotation.Internal;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
- * Provides a ThreadLocal cache with a maximum cache size per thread.
- * Values must not be null.
+ * Provides a thread local cache with a maximum cache size per thread.
+ *
+ * Note: Values must not be null.
  */
+@Internal
 public abstract class ThreadLocalCache {
 
private static final int DEFAULT_CACHE_SIZE = 64;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
index d69a5c9..0bde983 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/DateTimeFunctions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.runtime.functions
 
+import org.apache.flink.table.utils.ThreadLocalCache
 import org.joda.time.format.DateTimeFormatter
 import org.joda.time.format.DateTimeFormatterBuilder
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
deleted file mode 100644
index b3a8d7a..000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ThreadLocalCache.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.functions
-
-import java.util.{LinkedHashMap => JLinkedHashMap}
-import java.util.{Map => JMap}
-
-/**
-  * Provides a ThreadLocal cache with a maximum cache size per thread.
-  * Values must not be null.
-  */
-abstract class ThreadLocalCache[K, V](val maxSizePerThread: Int) {
-  private val cache = new ThreadLocal[BoundedMap[K, V]]
-
-  protected def getNewInstance(key: K): V
-
-  def get(key: K): V = {
-var m = cache.get
-if (m == null) {
-  m = new BoundedMap(maxSizePerThread)
-  cache.set(m)
-}
-var v = m.get(key)
-if (v == null) {
-  v = getNewInstance(key)
-  m.put(key, v)
-}
-v
-  }
-}
-
-private class BoundedMap[

[flink] branch master updated: [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e373c44  [FLINK-13228][tests][filesystems] Harden 
HadoopRecoverableWriterTest
e373c44 is described below

commit e373c4481e6a0ca0e1e73a6170b9e3da5cc9be5b
Author: Yu Li 
AuthorDate: Fri Jul 26 10:27:28 2019 +0200

[FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest

Currently test cases will fail when trying to close the output stream if 
all data written
but ClosedByInterruptException occurs at the ending phase. This commit 
fixes it.

This closes #9235
---
 .../core/fs/AbstractRecoverableWriterTest.java | 49 ++
 1 file changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
index ab37a07..de9b095 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
final Path path = new Path(testDir, "part-0");
 
-   try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+   RecoverableFsDataOutputStream stream = null;
+   try {
+   stream = writer.open(path);

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
 
@@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
Assert.assertEquals("part-0", 
fileContents.getKey().getName());
Assert.assertEquals(testData1, 
fileContents.getValue());
}
+   } finally {
+   IOUtils.closeQuietly(stream);
}
}
 
@@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
final Path path = new Path(testDir, "part-0");
 
-   try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+   RecoverableFsDataOutputStream stream = null;
+   try {
+   stream = writer.open(path);

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.persist();
 
@@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
Assert.assertEquals("part-0", 
fileContents.getKey().getName());
Assert.assertEquals(testData1 + testData2, 
fileContents.getValue());
}
+   } finally {
+   IOUtils.closeQuietly(stream);
}
}
 
@@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
final RecoverableWriter initWriter = getNewFileSystemWriter();
 
final Map 
recoverables = new HashMap<>(4);
-   try (final RecoverableFsDataOutputStream stream = 
initWriter.open(path)) {
+   RecoverableFsDataOutputStream stream = null;
+   try {
+   stream = initWriter.open(path);
recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
 

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
@@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {

stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
recoverables.put(FINAL_WITH_EXTRA_STATE, 
stream.persist());
+   } finally {
+   IOUtils.closeQuietly(stream);
}
 
final 
SimpleVersionedSerializer serializer = 
initWriter.getResumeRecoverableSerializer();
@@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
final RecoverableWriter.ResumeRecoverable recoveredRecoverable =

deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
 
-   try (final RecoverableFsDataOutputStream recoveredStream = 
newWriter.recover(recoveredRecoverable)) {
+   RecoverableFsDataOutputStream recoveredStream = null;
+ 

[flink] branch release-1.9 updated: [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest

2019-07-29 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 0e9f463  [FLINK-13228][tests][filesystems] Harden 
HadoopRecoverableWriterTest
0e9f463 is described below

commit 0e9f463668378bd7469194ebf0af76e3c125f0d7
Author: Yu Li 
AuthorDate: Fri Jul 26 10:27:28 2019 +0200

[FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest

Currently test cases will fail when trying to close the output stream if 
all data written
but ClosedByInterruptException occurs at the ending phase. This commit 
fixes it.

This closes #9235
---
 .../core/fs/AbstractRecoverableWriterTest.java | 49 ++
 1 file changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
index ab37a07..de9b095 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
final Path path = new Path(testDir, "part-0");
 
-   try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+   RecoverableFsDataOutputStream stream = null;
+   try {
+   stream = writer.open(path);

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
 
@@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
Assert.assertEquals("part-0", 
fileContents.getKey().getName());
Assert.assertEquals(testData1, 
fileContents.getValue());
}
+   } finally {
+   IOUtils.closeQuietly(stream);
}
}
 
@@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
final Path path = new Path(testDir, "part-0");
 
-   try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+   RecoverableFsDataOutputStream stream = null;
+   try {
+   stream = writer.open(path);

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.persist();
 
@@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
Assert.assertEquals("part-0", 
fileContents.getKey().getName());
Assert.assertEquals(testData1 + testData2, 
fileContents.getValue());
}
+   } finally {
+   IOUtils.closeQuietly(stream);
}
}
 
@@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
final RecoverableWriter initWriter = getNewFileSystemWriter();
 
final Map 
recoverables = new HashMap<>(4);
-   try (final RecoverableFsDataOutputStream stream = 
initWriter.open(path)) {
+   RecoverableFsDataOutputStream stream = null;
+   try {
+   stream = initWriter.open(path);
recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
 

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
@@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {

stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
recoverables.put(FINAL_WITH_EXTRA_STATE, 
stream.persist());
+   } finally {
+   IOUtils.closeQuietly(stream);
}
 
final 
SimpleVersionedSerializer serializer = 
initWriter.getResumeRecoverableSerializer();
@@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
final RecoverableWriter.ResumeRecoverable recoveredRecoverable =

deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
 
-   try (final RecoverableFsDataOutputStream recoveredStream = 
newWriter.recover(recoveredRecoverable)) {
+   RecoverableFsDataOutputStream recoveredStream = nu

[flink] branch master updated: [FLINK-13447][table] Change default planner to legacy planner instead of any one

2019-07-29 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 5b01e71  [FLINK-13447][table] Change default planner to legacy planner 
instead of any one
5b01e71 is described below

commit 5b01e7139c1d0307e93f0e448e75ee6b5e6541ca
Author: Jark Wu 
AuthorDate: Sat Jul 27 23:08:18 2019 +0800

[FLINK-13447][table] Change default planner to legacy planner instead of 
any one

This closes #9249
---
 .../flink/table/api/EnvironmentSettings.java   | 26 +---
 .../flink/table/api/TableEnvironmentTest.scala |  2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |  3 +-
 .../validation/SetOperatorsValidationTest.scala| 11 ---
 .../table/validation/TableSinkValidationTest.scala |  9 +++---
 .../validation/UnsupportedOpsValidationTest.scala  | 20 ++---
 .../planner/plan/utils/FlinkRelOptUtilTest.scala   |  3 +-
 .../runtime/stream/sql/MatchRecognizeITCase.scala  | 35 +++---
 .../runtime/stream/sql/TemporalJoinITCase.scala|  6 ++--
 .../runtime/stream/table/TableSinkITCase.scala | 29 +-
 .../runtime/utils/StreamingWithStateTestBase.scala |  4 +--
 .../flink/table/planner/utils/TableTestBase.scala  |  5 
 12 files changed, 79 insertions(+), 74 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 29c9227..8289bbc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -149,29 +149,35 @@ public class EnvironmentSettings {
 * A builder for {@link EnvironmentSettings}.
 */
public static class Builder {
-   private String plannerClass = null;
-   private String executorClass = null;
+   private static final String OLD_PLANNER_FACTORY = 
"org.apache.flink.table.planner.StreamPlannerFactory";
+   private static final String OLD_EXECUTOR_FACTORY = 
"org.apache.flink.table.executor.StreamExecutorFactory";
+   private static final String BLINK_PLANNER_FACTORY = 
"org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
+   private static final String BLINK_EXECUTOR_FACTORY = 
"org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+
+   private String plannerClass = OLD_PLANNER_FACTORY;
+   private String executorClass = OLD_EXECUTOR_FACTORY;
private String builtInCatalogName = "default_catalog";
private String builtInDatabaseName = "default_database";
private boolean isStreamingMode = true;
 
/**
-* Sets the old Flink planner as the required module. By 
default, {@link #useAnyPlanner()} is
-* enabled.
+* Sets the old Flink planner as the required module.
+*
+* This is the default behavior.
 */
public Builder useOldPlanner() {
-   this.plannerClass = 
"org.apache.flink.table.planner.StreamPlannerFactory";
-   this.executorClass = 
"org.apache.flink.table.executor.StreamExecutorFactory";
+   this.plannerClass = OLD_PLANNER_FACTORY;
+   this.executorClass = OLD_EXECUTOR_FACTORY;
return this;
}
 
/**
-* Sets the Blink planner as the required module. By default, 
{@link #useAnyPlanner()} is
+* Sets the Blink planner as the required module. By default, 
{@link #useOldPlanner()} is
 * enabled.
 */
public Builder useBlinkPlanner() {
-   this.plannerClass = 
"org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
-   this.executorClass = 
"org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+   this.plannerClass = BLINK_PLANNER_FACTORY;
+   this.executorClass = BLINK_EXECUTOR_FACTORY;
return this;
}
 
@@ -180,7 +186,7 @@ public class EnvironmentSettings {
 *
 * A planner will be discovered automatically, if there is 
only one planner available.
 *
-* This is the default behavior.
+* By default, {@link #useOldPlanner()} is enabled.
 */
public Builder useAnyPlanner() {
this.plannerClass = null;
diff --git 
a/flink-table/flink-table

[flink] branch release-1.9 updated: [FLINK-13447][table] Change default planner to legacy planner instead of any one

2019-07-29 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new c763cef  [FLINK-13447][table] Change default planner to legacy planner 
instead of any one
c763cef is described below

commit c763cef2f0d244b305fc9e3c70be0d6c89d5eba5
Author: Jark Wu 
AuthorDate: Sat Jul 27 23:08:18 2019 +0800

[FLINK-13447][table] Change default planner to legacy planner instead of 
any one

This closes #9249
---
 .../flink/table/api/EnvironmentSettings.java   | 26 +---
 .../flink/table/api/TableEnvironmentTest.scala |  2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |  3 +-
 .../validation/SetOperatorsValidationTest.scala| 11 ---
 .../table/validation/TableSinkValidationTest.scala |  9 +++---
 .../validation/UnsupportedOpsValidationTest.scala  | 20 ++---
 .../planner/plan/utils/FlinkRelOptUtilTest.scala   |  3 +-
 .../runtime/stream/sql/MatchRecognizeITCase.scala  | 35 +++---
 .../runtime/stream/sql/TemporalJoinITCase.scala|  6 ++--
 .../runtime/stream/table/TableSinkITCase.scala | 29 +-
 .../runtime/utils/StreamingWithStateTestBase.scala |  4 +--
 .../flink/table/planner/utils/TableTestBase.scala  |  5 
 12 files changed, 79 insertions(+), 74 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 29c9227..8289bbc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -149,29 +149,35 @@ public class EnvironmentSettings {
 * A builder for {@link EnvironmentSettings}.
 */
public static class Builder {
-   private String plannerClass = null;
-   private String executorClass = null;
+   private static final String OLD_PLANNER_FACTORY = 
"org.apache.flink.table.planner.StreamPlannerFactory";
+   private static final String OLD_EXECUTOR_FACTORY = 
"org.apache.flink.table.executor.StreamExecutorFactory";
+   private static final String BLINK_PLANNER_FACTORY = 
"org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
+   private static final String BLINK_EXECUTOR_FACTORY = 
"org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+
+   private String plannerClass = OLD_PLANNER_FACTORY;
+   private String executorClass = OLD_EXECUTOR_FACTORY;
private String builtInCatalogName = "default_catalog";
private String builtInDatabaseName = "default_database";
private boolean isStreamingMode = true;
 
/**
-* Sets the old Flink planner as the required module. By 
default, {@link #useAnyPlanner()} is
-* enabled.
+* Sets the old Flink planner as the required module.
+*
+* This is the default behavior.
 */
public Builder useOldPlanner() {
-   this.plannerClass = 
"org.apache.flink.table.planner.StreamPlannerFactory";
-   this.executorClass = 
"org.apache.flink.table.executor.StreamExecutorFactory";
+   this.plannerClass = OLD_PLANNER_FACTORY;
+   this.executorClass = OLD_EXECUTOR_FACTORY;
return this;
}
 
/**
-* Sets the Blink planner as the required module. By default, 
{@link #useAnyPlanner()} is
+* Sets the Blink planner as the required module. By default, 
{@link #useOldPlanner()} is
 * enabled.
 */
public Builder useBlinkPlanner() {
-   this.plannerClass = 
"org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
-   this.executorClass = 
"org.apache.flink.table.planner.delegation.BlinkExecutorFactory";
+   this.plannerClass = BLINK_PLANNER_FACTORY;
+   this.executorClass = BLINK_EXECUTOR_FACTORY;
return this;
}
 
@@ -180,7 +186,7 @@ public class EnvironmentSettings {
 *
 * A planner will be discovered automatically, if there is 
only one planner available.
 *
-* This is the default behavior.
+* By default, {@link #useOldPlanner()} is enabled.
 */
public Builder useAnyPlanner() {
this.plannerClass = null;
diff --git 
a/flink-table/f

[flink] branch master updated: [FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test

2019-07-29 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7fec439  [FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test
7fec439 is described below

commit 7fec4392b21b07c69ba15ea554731886f181609e
Author: ifndef-SleePy 
AuthorDate: Tue Jul 23 14:29:36 2019 +0200

[FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test
---
 flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index a22c6d4..2ecea72 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -21,8 +21,9 @@ source "$(dirname "$0")"/common.sh
 
 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
 
-# enable DEBUG logging level to retrieve truncate length later
-sed -i -e 
's/#log4j.logger.org.apache.flink=INFO/log4j.logger.org.apache.flink=DEBUG/g' 
$FLINK_DIR/conf/log4j.properties
+# enable DEBUG logging level for the BucketingSink to retrieve truncate length 
later
+echo "" >> $FLINK_DIR/conf/log4j.properties
+echo 
"log4j.logger.org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink=DEBUG"
 >> $FLINK_DIR/conf/log4j.properties
 
 set_conf_ssl
 start_cluster



[flink] branch release-1.9 updated: [FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test

2019-07-29 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 32ec971  [FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test
32ec971 is described below

commit 32ec971b3617e8fe4fc87334f747ed44afceb4c2
Author: ifndef-SleePy 
AuthorDate: Tue Jul 23 14:29:36 2019 +0200

[FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test
---
 flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index a22c6d4..2ecea72 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -21,8 +21,9 @@ source "$(dirname "$0")"/common.sh
 
 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
 
-# enable DEBUG logging level to retrieve truncate length later
-sed -i -e 
's/#log4j.logger.org.apache.flink=INFO/log4j.logger.org.apache.flink=DEBUG/g' 
$FLINK_DIR/conf/log4j.properties
+# enable DEBUG logging level for the BucketingSink to retrieve truncate length 
later
+echo "" >> $FLINK_DIR/conf/log4j.properties
+echo 
"log4j.logger.org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink=DEBUG"
 >> $FLINK_DIR/conf/log4j.properties
 
 set_conf_ssl
 start_cluster



[flink] branch release-1.8 updated: [FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test

2019-07-29 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new c4e9743  [FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test
c4e9743 is described below

commit c4e97436bb0ef5a658ff4a9117b4776d58f888c0
Author: ifndef-SleePy 
AuthorDate: Tue Jul 23 14:29:36 2019 +0200

[FLINK-9526][e2e] Fix unstable BucketingSink end-to-end test
---
 flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index ba43e21..b3d19fd 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -21,8 +21,9 @@ source "$(dirname "$0")"/common.sh
 
 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
 
-# enable DEBUG logging level to retrieve truncate length later
-sed -i -e 
's/#log4j.logger.org.apache.flink=INFO/log4j.logger.org.apache.flink=DEBUG/g' 
$FLINK_DIR/conf/log4j.properties
+# enable DEBUG logging level for the BucketingSink to retrieve truncate length 
later
+echo "" >> $FLINK_DIR/conf/log4j.properties
+echo 
"log4j.logger.org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink=DEBUG"
 >> $FLINK_DIR/conf/log4j.properties
 
 set_conf_ssl
 start_cluster



[flink-web] branch asf-site updated: add myself to the flink community page

2019-07-29 Thread rongr
This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 3acaa86  add myself to the flink community page
3acaa86 is described below

commit 3acaa862cb998345da10ffb8bfff754d77e16d6a
Author: Rong Rong 
AuthorDate: Sun Jul 28 16:48:46 2019 -0700

add myself to the flink community page
---
 community.md  | 6 ++
 community.zh.md   | 6 ++
 content/community.html| 6 ++
 content/zh/community.html | 6 ++
 4 files changed, 24 insertions(+)

diff --git a/community.md b/community.md
index 4fd0b12..535f9ba 100644
--- a/community.md
+++ b/community.md
@@ -329,6 +329,12 @@ Flink Forward is a conference happening yearly in 
different locations around the
 trohrmann
   
   
+https://avatars1.githubusercontent.com/u/3581352?s=50"; 
class="committer-avatar">
+Rong Rong
+Committer
+rongr
+  
+  
 https://avatars0.githubusercontent.com/u/105434?s=50"; 
class="committer-avatar">
 Henry Saputra
 PMC, Committer
diff --git a/community.zh.md b/community.zh.md
index b596f1c..6fd0be6 100644
--- a/community.zh.md
+++ b/community.zh.md
@@ -324,6 +324,12 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最
 trohrmann
   
   
+https://avatars1.githubusercontent.com/u/3581352?s=50"; 
class="committer-avatar">
+Rong Rong
+Committer
+rongr
+  
+  
 https://avatars0.githubusercontent.com/u/105434?s=50"; 
class="committer-avatar">
 Henry Saputra
 PMC, Committer
diff --git a/content/community.html b/content/community.html
index 08d811a..8aa79b3 100644
--- a/content/community.html
+++ b/content/community.html
@@ -517,6 +517,12 @@
 trohrmann
   
   
+https://avatars1.githubusercontent.com/u/3581352?s=50"; 
class="committer-avatar" />
+Rong Rong
+Committer
+rongr
+  
+  
 https://avatars0.githubusercontent.com/u/105434?s=50"; 
class="committer-avatar" />
 Henry Saputra
 PMC, Committer
diff --git a/content/zh/community.html b/content/zh/community.html
index a760b42..6688913 100644
--- a/content/zh/community.html
+++ b/content/zh/community.html
@@ -514,6 +514,12 @@
 trohrmann
   
   
+https://avatars1.githubusercontent.com/u/3581352?s=50"; 
class="committer-avatar" />
+Rong Rong
+Committer
+rongr
+  
+  
 https://avatars0.githubusercontent.com/u/105434?s=50"; 
class="committer-avatar" />
 Henry Saputra
 PMC, Committer



[flink] branch master updated: [FLINK-12747][docs] Getting Started - Table Api Walkthrough

2019-07-29 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f4943dd  [FLINK-12747][docs] Getting Started - Table Api Walkthrough
f4943dd is described below

commit f4943dd06b2ff7da899812a2aaa2e0b24c2afc01
Author: Seth Wiesman 
AuthorDate: Wed Jun 26 14:48:38 2019 -0500

[FLINK-12747][docs] Getting Started - Table Api Walkthrough
---
 docs/getting-started/examples/index.md |   2 +-
 docs/getting-started/examples/index.zh.md  |   2 +-
 docs/getting-started/tutorials/index.md|   2 +-
 docs/getting-started/tutorials/index.zh.md |   2 +-
 .../{tutorials => walkthroughs}/index.md   |   8 +-
 .../{tutorials => walkthroughs}/index.zh.md|   8 +-
 docs/getting-started/walkthroughs/table_api.md | 494 +
 docs/getting-started/walkthroughs/table_api.zh.md  | 494 +
 flink-end-to-end-tests/run-nightly-tests.sh|   3 +
 .../test-scripts/test_table_walkthroughs.sh|  74 +++
 .../flink-walkthrough-common/pom.xml   |  59 +++
 .../walkthrough/common/entity/Transaction.java |  93 
 .../common/sink/LoggerOutputFormat.java|  50 +++
 .../common/source/TransactionIterator.java | 130 ++
 .../common/source/TransactionRowInputFormat.java   |  60 +++
 .../common/source/TransactionSource.java   |  65 +++
 .../table/BoundedTransactionTableSource.java   |  55 +++
 .../common/table/SpendReportTableSink.java |  95 
 .../common/table/TruncateDateToHour.java   |  46 ++
 .../table/UnboundedTransactionTableSource.java |  89 
 .../flink-walkthrough-table-java/pom.xml   |  26 +-
 .../META-INF/maven/archetype-metadata.xml  |  25 +-
 .../src/main/resources/archetype-resources/pom.xml | 263 +++
 .../src/main/java/SpendReport.java |  45 ++
 .../src/main/resources/log4j.properties|  24 +
 .../flink-walkthrough-table-scala/pom.xml  |  26 +-
 .../META-INF/maven/archetype-metadata.xml  |  25 +-
 .../src/main/resources/archetype-resources/pom.xml | 300 +
 .../src/main/resources/log4j.properties|  24 +
 .../src/main/scala/SpendReport.scala   |  41 ++
 flink-walkthroughs/pom.xml |  95 
 pom.xml|   1 +
 32 files changed, 2686 insertions(+), 40 deletions(-)

diff --git a/docs/getting-started/examples/index.md 
b/docs/getting-started/examples/index.md
index 6810bbd..d4d315c 100644
--- a/docs/getting-started/examples/index.md
+++ b/docs/getting-started/examples/index.md
@@ -3,7 +3,7 @@ title: Examples
 nav-id: examples
 nav-title: ' Examples'
 nav-parent_id: getting-started
-nav-pos: 2
+nav-pos: 3
 nav-show_overview: true
 ---
 
+-->
\ No newline at end of file
diff --git a/docs/getting-started/tutorials/index.zh.md 
b/docs/getting-started/walkthroughs/index.zh.md
similarity index 84%
copy from docs/getting-started/tutorials/index.zh.md
copy to docs/getting-started/walkthroughs/index.zh.md
index 540a6c4..ae2f536 100644
--- a/docs/getting-started/tutorials/index.zh.md
+++ b/docs/getting-started/walkthroughs/index.zh.md
@@ -1,7 +1,7 @@
 ---
-title: "教程"
-nav-id: tutorials
-nav-title: ' 
教程'
+title: "Code Walkthroughs"
+nav-id: walkthroughs
+nav-title: ' Code 
Walkthroughs'
 nav-parent_id: getting-started
 nav-pos: 1
 ---
@@ -22,4 +22,4 @@ software distributed under the License is distributed on an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+-->
\ No newline at end of file
diff --git a/docs/getting-started/walkthroughs/table_api.md 
b/docs/getting-started/walkthroughs/table_api.md
new file mode 100644
index 000..878fb54
--- /dev/null
+++ b/docs/getting-started/walkthroughs/table_api.md
@@ -0,0 +1,494 @@
+---
+title: "Table API"
+nav-id: tableapiwalkthrough
+nav-title: 'Table API'
+nav-parent_id: walkthroughs
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, batch data sets and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Will You Be Building? 
+
+In this tutorial, you will learn how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+You will start by building your report as a nightly batch job, and then 
migrate to a streaming pipeline.
+
+## Prerequisites
+
+This walkthrough assumes that you have some familiarity with Java or Scala, 
but you s

buildbot failure in on flink-docs-master

2019-07-29 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/1549

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

BUILD FAILED: failed Build Python docs

Sincerely,
 -The Buildbot





[flink] branch master updated: [FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch case

2019-07-29 Thread rongr
This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new ef29f30  [FLINK-13347][table-planner] should handle SEMI/ANTI 
JoinRelType in switch case
ef29f30 is described below

commit ef29f305cd3d907d7c445c271b314ea643baaeeb
Author: godfreyhe 
AuthorDate: Thu Jul 25 17:06:50 2019 +0800

[FLINK-13347][table-planner] should handle SEMI/ANTI JoinRelType in switch 
case

This closes #9227.
---
 .../scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala | 2 +-
 .../main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala | 2 ++
 .../org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala   | 1 +
 .../nodes/datastream/DataStreamJoinToCoProcessTranslator.scala| 8 +---
 .../flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala  | 2 ++
 .../org/apache/flink/table/runtime/join/WindowJoinUtil.scala  | 4 ++--
 6 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index bd2e8fa..923307c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -100,7 +100,7 @@ trait CommonCorrelate {
|}
|""".stripMargin
 } else if (joinType != JoinRelType.INNER) {
-  throw new TableException(s"Unsupported SemiJoinType: $joinType for 
correlate join.")
+  throw new TableException(s"Unsupported JoinRelType: $joinType for 
correlate join.")
 }
 
 functionGenerator.generateFunction(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
index 3d98a4d..753ec40 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonJoin.scala
@@ -45,6 +45,8 @@ trait CommonJoin {
   case JoinRelType.LEFT=> "LeftOuterJoin"
   case JoinRelType.RIGHT => "RightOuterJoin"
   case JoinRelType.FULL => "FullOuterJoin"
+  case JoinRelType.SEMI => "SemiJoin"
+  case JoinRelType.ANTI => "AntiJoin"
 }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index 1df75e6..2e319f4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -199,6 +199,7 @@ class DataSetJoin(
   rightKeys.toArray,
   returnType,
   config)
+  case _ => throw new TableException(s"$joinType is not supported.")
 }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 846e452..8c418a3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -18,13 +18,11 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator
-import org.apache.flink.table.api.{StreamQueryConfig, TableConfig}
+import org.apache.flink.table.api.{StreamQueryConfig, TableConfig, 
ValidationException}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowKeySelector
@@ -32,6 +30,9 @@ import org.apache.flink.table.runtime.join._
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+impo