[flink] branch master updated: [FLINK-12411][table-planner][tests] Workaround limited support of not nullable fields in window aggregation

2019-05-22 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7a3f081  [FLINK-12411][table-planner][tests] Workaround limited 
support of not nullable fields in window aggregation
7a3f081 is described below

commit 7a3f0813d15982177318c6c58cbe1d83799132e4
Author: Dawid Wysakowicz 
AuthorDate: Mon May 6 09:06:35 2019 +0200

[FLINK-12411][table-planner][tests] Workaround limited support of not 
nullable fields in window aggregation

This does not fix the problem. It just makes the e2e test pass. For a
proper fix of the underlying problem see FLINK-12249.
---
 .../main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java| 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
index 47bca8e..cde040d 100644
--- 
a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
@@ -106,7 +106,9 @@ public class StreamSQLTestProgram {
String tumbleQuery = String.format(
"SELECT " +
"  key, " +
-   "  CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 ELSE 99 END 
AS correct, " +
+   //TODO: The "WHEN -1 THEN NULL" part is a temporary 
workaround, to make the test pass, for
+   // https://issues.apache.org/jira/browse/FLINK-12249. 
We should remove it once the issue is fixed.
+   "  CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 WHEN -1 
THEN NULL ELSE 99 END AS correct, " +
"  TUMBLE_START(rowtime, INTERVAL '%d' SECOND) AS 
wStart, " +
"  TUMBLE_ROWTIME(rowtime, INTERVAL '%d' SECOND) AS 
rowtime " +
"FROM (%s) " +



[flink] branch master updated: [FLINK-12563][table-runtime-blink] Introduce vector data format in blink

2019-05-22 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1e6ce1b  [FLINK-12563][table-runtime-blink] Introduce vector data 
format in blink
1e6ce1b is described below

commit 1e6ce1bb0c38be3ccf6baa5f3c7acedb86528080
Author: Jingsong Lee 
AuthorDate: Thu May 23 14:45:24 2019 +0800

[FLINK-12563][table-runtime-blink] Introduce vector data format in blink

This closes #8492
---
 .../apache/flink/table/dataformat/ColumnarRow.java | 210 +
 .../dataformat/vector/AbstractColumnVector.java|  62 ++
 .../dataformat/vector/BooleanColumnVector.java |  26 +++
 .../table/dataformat/vector/ByteColumnVector.java  |  26 +++
 .../table/dataformat/vector/BytesColumnVector.java |  48 +
 .../table/dataformat/vector/ColumnVector.java  |  32 
 .../flink/table/dataformat/vector/Dictionary.java  |  34 
 .../dataformat/vector/DoubleColumnVector.java  |  26 +++
 .../table/dataformat/vector/FloatColumnVector.java |  26 +++
 .../table/dataformat/vector/IntColumnVector.java   |  26 +++
 .../table/dataformat/vector/LongColumnVector.java  |  26 +++
 .../table/dataformat/vector/ShortColumnVector.java |  26 +++
 .../dataformat/vector/VectorizedColumnBatch.java   | 134 +
 .../dataformat/vector/heap/AbstractHeapVector.java | 132 +
 .../dataformat/vector/heap/HeapBooleanVector.java  |  51 +
 .../dataformat/vector/heap/HeapByteVector.java |  50 +
 .../dataformat/vector/heap/HeapBytesVector.java| 137 ++
 .../dataformat/vector/heap/HeapDoubleVector.java   |  52 +
 .../dataformat/vector/heap/HeapFloatVector.java|  51 +
 .../dataformat/vector/heap/HeapIntVector.java  |  50 +
 .../dataformat/vector/heap/HeapLongVector.java |  50 +
 .../dataformat/vector/heap/HeapShortVector.java|  50 +
 .../vector/VectorizedColumnBatchTest.java  | 190 +++
 23 files changed, 1515 insertions(+)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
new file mode 100644
index 000..8764c98
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
+import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
+
+/**
+ * Columnar row to support access to vector column data. It is a row view in 
{@link VectorizedColumnBatch}.
+ */
+public final class ColumnarRow implements BaseRow {
+   private byte header;
+   private VectorizedColumnBatch vectorizedColumnBatch;
+   private int rowId;
+
+   public ColumnarRow() {}
+
+   public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) {
+   this(vectorizedColumnBatch, 0);
+   }
+
+   public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int 
rowId) {
+   this.vectorizedColumnBatch = vectorizedColumnBatch;
+   this.rowId = rowId;
+   }
+
+   public void setVectorizedColumnBatch(
+   VectorizedColumnBatch vectorizedColumnBatch) {
+   this.vectorizedColumnBatch = vectorizedColumnBatch;
+   this.rowId = 0;
+   }
+
+   public void setRowId(int rowId) {
+   this.rowId = rowId;
+   }
+
+   @Override
+   public byte getHeader() {
+   return header;
+   }
+
+   @Override
+   public void setHeader(byte header) {
+   this.header = header;
+   }
+
+   @Override
+   public int getArity() {
+   return vectorizedColumnBatch.getArity();
+   }
+
+   @Override
+   public boolean isNullAt(int ordinal) {
+   return vectorizedColumnBatch.isNullAt(rowId, ordi

[flink] branch master updated: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class

2019-05-22 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new aca9c01  [FLINK-12582][table][hive] Alteration APIs in catalogs should 
check existing object and new object are of the same class
aca9c01 is described below

commit aca9c018a4b26e50333c33426e77323cbb9b7960
Author: bowen.li 
AuthorDate: Wed May 22 14:34:42 2019 -0700

[FLINK-12582][table][hive] Alteration APIs in catalogs should check 
existing object and new object are of the same class

This PR supports alterations in catalogs to check existing object and new 
object are of the same class.

Most of them currently don't, e.g. you can alter an existing generic table 
with a new hive table in GenericInMemoryCatalog.

This closes #8514.
---
 .../flink/table/catalog/hive/HiveCatalog.java  |  55 
 .../hive/HiveCatalogGenericMetadataTest.java   |  16 ---
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |   3 +
 .../table/catalog/GenericInMemoryCatalog.java  |  41 +-
 .../table/catalog/GenericInMemoryCatalogTest.java  |  40 +++---
 .../flink/table/catalog/CatalogTestBase.java   | 155 ++---
 6 files changed, 218 insertions(+), 92 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index d6818f7..22fe08b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -27,8 +27,6 @@ import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogFunction;
 import org.apache.flink.table.catalog.GenericCatalogTable;
@@ -230,14 +228,27 @@ public class HiveCatalog implements Catalog {

checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
checkNotNull(newDatabase, "newDatabase cannot be null");
 
+   CatalogDatabase existingDatabase;
+   try {
+   existingDatabase = getDatabase(databaseName);
+   } catch (DatabaseNotExistException e) {
+   if (!ignoreIfNotExists) {
+   throw e;
+   }
+   return;
+   }
+
+   if (existingDatabase.getClass() != newDatabase.getClass()) {
+   throw new CatalogException(
+   String.format("Database types don't match. 
Existing database is '%s' and new database is '%s'.",
+   existingDatabase.getClass().getName(), 
newDatabase.getClass().getName())
+   );
+   }
+
Database newHiveDatabase = 
instantiateHiveDatabase(databaseName, newDatabase);
 
try {
-   if (databaseExists(databaseName)) {
-   client.alterDatabase(databaseName, 
newHiveDatabase);
-   } else if (!ignoreIfNotExists) {
-   throw new 
DatabaseNotExistException(catalogName, databaseName);
-   }
+   client.alterDatabase(databaseName, newHiveDatabase);
} catch (TException e) {
throw new CatalogException(String.format("Failed to 
alter database %s", databaseName), e);
}
@@ -360,32 +371,22 @@ public class HiveCatalog implements Catalog {
checkNotNull(tablePath, "tablePath cannot be null");
checkNotNull(newCatalogTable, "newCatalogTable cannot be null");
 
-   if (!tableExists(tablePath)) {
+   Table hiveTable;
+   try {
+   hiveTable = getHiveTable(tablePath);
+   } catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
+   throw e;
}
return;
}
 
-   Table oldTable = getHiveTable(tablePath);
-   TableType oldTableType = 
TableType.valueOf(oldTable.getTableType());
+   

[flink] branch master updated: [FLINK-12335][table-runtime-blink] Improvement the code and performance of class SegmentsUtil

2019-05-22 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 11a96fd  [FLINK-12335][table-runtime-blink] Improvement the code and 
performance of class SegmentsUtil
11a96fd is described below

commit 11a96fdf213467595dad73cffd9b05134a4d0d75
Author: liyafan82 
AuthorDate: Fri Apr 26 17:43:26 2019 +0800

[FLINK-12335][table-runtime-blink] Improvement the code and performance of 
class SegmentsUtil

This closes #8278
---
 .../org/apache/flink/table/util/SegmentsUtil.java  | 27 ++
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
index daf7ebd..47475a4 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java
@@ -36,7 +36,7 @@ public class SegmentsUtil {
 */
public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN;
 
-   private static final int BIT_BYTE_POSITION_MASK = 0xfff8;
+   private static final int ADDRESS_BITS_PER_WORD = 3;
 
private static final int BIT_BYTE_INDEX_MASK = 7;
 
@@ -410,6 +410,15 @@ public class SegmentsUtil {
}
 
/**
+* Given a bit index, return the byte index containing it.
+* @param bitIndex the bit index.
+* @return the byte index.
+*/
+   private static int byteIndex(int bitIndex) {
+   return bitIndex >>> ADDRESS_BITS_PER_WORD;
+   }
+
+   /**
 * unset bit.
 *
 * @param segment target segment.
@@ -417,7 +426,7 @@ public class SegmentsUtil {
 * @param index bit index from base offset.
 */
public static void bitUnSet(MemorySegment segment, int baseOffset, int 
index) {
-   int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+   int offset = baseOffset + byteIndex(index);
byte current = segment.get(offset);
current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
segment.put(offset, current);
@@ -431,7 +440,7 @@ public class SegmentsUtil {
 * @param index bit index from base offset.
 */
public static void bitSet(MemorySegment segment, int baseOffset, int 
index) {
-   int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+   int offset = baseOffset + byteIndex(index);
byte current = segment.get(offset);
current |= (1 << (index & BIT_BYTE_INDEX_MASK));
segment.put(offset, current);
@@ -445,7 +454,7 @@ public class SegmentsUtil {
 * @param index bit index from base offset.
 */
public static boolean bitGet(MemorySegment segment, int baseOffset, int 
index) {
-   int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+   int offset = baseOffset + byteIndex(index);
byte current = segment.get(offset);
return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
}
@@ -460,7 +469,7 @@ public class SegmentsUtil {
public static void bitUnSet(MemorySegment[] segments, int baseOffset, 
int index) {
if (segments.length == 1) {
MemorySegment segment = segments[0];
-   int offset = baseOffset + ((index & 
BIT_BYTE_POSITION_MASK) >>> 3);
+   int offset = baseOffset + byteIndex(index);
byte current = segment.get(offset);
current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
segment.put(offset, current);
@@ -470,7 +479,7 @@ public class SegmentsUtil {
}
 
private static void bitUnSetMultiSegments(MemorySegment[] segments, int 
baseOffset, int index) {
-   int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 
3);
+   int offset = baseOffset + byteIndex(index);
int segSize = segments[0].size();
int segIndex = offset / segSize;
int segOffset = offset - segIndex * segSize; // equal to %
@@ -490,7 +499,7 @@ public class SegmentsUtil {
 */
public static void bitSet(MemorySegment[] segments, int baseOffset, int 
index) {
if (segments.length == 1) {
-   int offset = baseOffset + ((index & 
BIT_BYTE_POSITION_MASK) >>> 3);
+   int offset = baseOffset + byteIndex(index);
MemorySegment segment = segments[0

[flink] branch release-1.8 updated: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)

2019-05-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 7b862dc  [FLINK-12539] [fs-connector] Make StreamingFileSink 
customizable (#8469)
7b862dc is described below

commit 7b862dcd91a2506f99dd6e1feacab57bc78fa4e6
Author: Kailash Dayanand 
AuthorDate: Wed May 22 10:40:27 2019 -0700

[FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
---
 .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index dc0b1c6..f5b1bf9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -127,7 +127,7 @@ public class StreamingFileSink
/**
 * Creates a new {@code StreamingFileSink} that writes files to the 
given base directory.
 */
-   private StreamingFileSink(
+   protected StreamingFileSink(
final StreamingFileSink.BucketsBuilder 
bucketsBuilder,
final long bucketCheckInterval) {
 
@@ -170,7 +170,7 @@ public class StreamingFileSink
/**
 * The base abstract class for the {@link RowFormatBuilder} and {@link 
BulkFormatBuilder}.
 */
-   private abstract static class BucketsBuilder implements 
Serializable {
+   protected abstract static class BucketsBuilder implements 
Serializable {
 
private static final long serialVersionUID = 1L;
 



[flink] branch master updated (f652bb5 -> 7748729)

2019-05-22 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f652bb5  [FLINK-12447][build] Set minimum maven version to 3.1.1
 add 7748729  [FLINK-12539] [fs-connector] Make StreamingFileSink 
customizable (#8469)

No new revisions were added by this update.

Summary of changes:
 .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[flink] branch cron-master-maven_compat updated: [FLINK-12447][travis] Remove maven 3.0.5 build

2019-05-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch cron-master-maven_compat
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/cron-master-maven_compat by 
this push:
 new 1a5fdba  [FLINK-12447][travis] Remove maven 3.0.5 build
1a5fdba is described below

commit 1a5fdbae00aafa8b7fbcab6da687f173db60797a
Author: Robert Metzger <89049+rmetz...@users.noreply.github.com>
AuthorDate: Wed May 22 19:23:45 2019 +0200

[FLINK-12447][travis] Remove maven 3.0.5 build
---
 .travis.yml | 4 
 1 file changed, 4 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 5a7a54a..5dcfdb7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -36,10 +36,6 @@ matrix:
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - MVN_VERSION="3.0.5"
-- env:
-  - REMOTE="apache"
-  - BRANCH="master"
   - MVN_VERSION="3.1.1"
 - env:
   - REMOTE="apache"



[flink] branch master updated (02a0cf3 -> f652bb5)

2019-05-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 02a0cf3  [FLINK-12566][table] Remove row interval type
 add f652bb5  [FLINK-12447][build] Set minimum maven version to 3.1.1

No new revisions were added by this update.

Summary of changes:
 README.md| 4 ++--
 docs/flinkDev/building.md| 2 +-
 docs/flinkDev/building.zh.md | 2 +-
 pom.xml  | 4 ++--
 4 files changed, 6 insertions(+), 6 deletions(-)



[flink] 03/03: [hotfix][hive] unify instantiateHiveDatabase() in HiveCatalog

2019-05-22 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 702c489acd3adb4bd354aa1e2384e5344ad07395
Author: bowen.li 
AuthorDate: Wed May 22 11:46:06 2019 -0700

[hotfix][hive] unify instantiateHiveDatabase() in HiveCatalog
---
 .../flink/table/catalog/hive/HiveCatalog.java  | 51 +-
 1 file changed, 21 insertions(+), 30 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 07a7159..d6818f7 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -189,14 +189,7 @@ public class HiveCatalog implements Catalog {

checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
checkNotNull(database, "database cannot be null");
 
-   Database hiveDatabase;
-   if (database instanceof HiveCatalogDatabase) {
-   hiveDatabase = instantiateHiveDatabase(databaseName, 
(HiveCatalogDatabase) database);
-   } else if (database instanceof GenericCatalogDatabase) {
-   hiveDatabase = instantiateHiveDatabase(databaseName, 
(GenericCatalogDatabase) database);
-   } else {
-   throw new CatalogException(String.format("Unsupported 
catalog database type %s", database.getClass()), null);
-   }
+   Database hiveDatabase = instantiateHiveDatabase(databaseName, 
database);
 
try {
client.createDatabase(hiveDatabase);
@@ -209,21 +202,26 @@ public class HiveCatalog implements Catalog {
}
}
 
-   private static Database instantiateHiveDatabase(String databaseName, 
HiveCatalogDatabase database) {
-   return new Database(databaseName,
-   database.getComment(),
-   database.getLocation(),
-   database.getProperties());
-   }
-
-   private static Database instantiateHiveDatabase(String databaseName, 
GenericCatalogDatabase database) {
-   Map properties = database.getProperties();
+   private static Database instantiateHiveDatabase(String databaseName, 
CatalogDatabase database) {
+   if (database instanceof HiveCatalogDatabase) {
+   HiveCatalogDatabase db = (HiveCatalogDatabase) database;
+   return new Database(
+   databaseName,
+   db.getComment(),
+   db.getLocation(),
+   db.getProperties());
+   } else if (database instanceof GenericCatalogDatabase) {
+   GenericCatalogDatabase db = (GenericCatalogDatabase) 
database;
 
-   return new Database(databaseName,
-   database.getComment(),
-   // HDFS location URI which GenericCatalogDatabase 
shouldn't care
-   null,
-   maskFlinkProperties(properties));
+   return new Database(
+   databaseName,
+   db.getComment(),
+   // HDFS location URI which 
GenericCatalogDatabase shouldn't care
+   null,
+   maskFlinkProperties(db.getProperties()));
+   } else {
+   throw new CatalogException(String.format("Unsupported 
catalog database type %s", database.getClass()), null);
+   }
}
 
@Override
@@ -232,14 +230,7 @@ public class HiveCatalog implements Catalog {

checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
checkNotNull(newDatabase, "newDatabase cannot be null");
 
-   Database newHiveDatabase;
-   if (newDatabase instanceof HiveCatalogDatabase) {
-   newHiveDatabase = instantiateHiveDatabase(databaseName, 
(HiveCatalogDatabase) newDatabase);
-   } else if (newDatabase instanceof GenericCatalogDatabase) {
-   newHiveDatabase = instantiateHiveDatabase(databaseName, 
(GenericCatalogDatabase) newDatabase);
-   } else {
-   throw new CatalogException(String.format("Unsupported 
catalog database type %s", newDatabase.getClass()), null);
-   }
+   Database newHiveDatabase = 
instantiateHiveDatabase(databaseName

[flink] branch master updated (02a0cf3 -> 702c489)

2019-05-22 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 02a0cf3  [FLINK-12566][table] Remove row interval type
 new f652bb5  [FLINK-12447][build] Set minimum maven version to 3.1.1
 new 7748729  [FLINK-12539] [fs-connector] Make StreamingFileSink 
customizable (#8469)
 new 702c489  [hotfix][hive] unify instantiateHiveDatabase() in HiveCatalog

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 README.md  |  4 +-
 docs/flinkDev/building.md  |  2 +-
 docs/flinkDev/building.zh.md   |  2 +-
 .../flink/table/catalog/hive/HiveCatalog.java  | 51 +-
 .../sink/filesystem/StreamingFileSink.java |  4 +-
 pom.xml|  4 +-
 6 files changed, 29 insertions(+), 38 deletions(-)



[flink] 01/03: [FLINK-12447][build] Set minimum maven version to 3.1.1

2019-05-22 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f652bb5a6ff3855ccf0038a5c0469896f41df908
Author: Robert Metzger <89049+rmetz...@users.noreply.github.com>
AuthorDate: Wed May 22 19:24:53 2019 +0200

[FLINK-12447][build] Set minimum maven version to 3.1.1
---
 README.md| 4 ++--
 docs/flinkDev/building.md| 2 +-
 docs/flinkDev/building.zh.md | 2 +-
 pom.xml  | 4 ++--
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/README.md b/README.md
index 34bf2c4..034189e 100644
--- a/README.md
+++ b/README.md
@@ -69,7 +69,7 @@ Prerequisites for building Flink:
 
 * Unix-like environment (we use Linux, Mac OS X, Cygwin)
 * git
-* Maven (we recommend version 3.2.5)
+* Maven (we recommend version 3.2.5 and require at least 3.1.1)
 * Java 8 (Java 9 and 10 are not yet supported)
 
 ```
@@ -80,7 +80,7 @@ mvn clean package -DskipTests # this will take up to 10 
minutes
 
 Flink is now installed in `build-target`
 
-*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain 
dependencies. Maven 3.0.3 creates the libraries properly.
+*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain 
dependencies. Maven 3.1.1 creates the libraries properly.
 To build unit tests with Java 8, use Java 8u51 or above to prevent failures in 
unit tests that use the PowerMock runner.*
 
 ## Developing Flink
diff --git a/docs/flinkDev/building.md b/docs/flinkDev/building.md
index c3a50d1..521bc9f 100644
--- a/docs/flinkDev/building.md
+++ b/docs/flinkDev/building.md
@@ -64,7 +64,7 @@ Flink [shades 
away](https://maven.apache.org/plugins/maven-shade-plugin/) some o
 
 The dependency shading mechanism was recently changed in Maven and requires 
users to build Flink slightly differently, depending on their Maven version:
 
-**Maven 3.0.x, 3.1.x, and 3.2.x**
+**Maven 3.1.x and 3.2.x**
 It is sufficient to call `mvn clean install -DskipTests` in the root directory 
of Flink code base.
 
 **Maven 3.3.x**
diff --git a/docs/flinkDev/building.zh.md b/docs/flinkDev/building.zh.md
index 75eddd5..c48de50 100644
--- a/docs/flinkDev/building.zh.md
+++ b/docs/flinkDev/building.zh.md
@@ -64,7 +64,7 @@ Flink [shades 
away](https://maven.apache.org/plugins/maven-shade-plugin/) some o
 
 The dependency shading mechanism was recently changed in Maven and requires 
users to build Flink slightly differently, depending on their Maven version:
 
-**Maven 3.0.x, 3.1.x, and 3.2.x**
+**Maven 3.1.x and 3.2.x**
 It is sufficient to call `mvn clean install -DskipTests` in the root directory 
of Flink code base.
 
 **Maven 3.3.x**
diff --git a/pom.xml b/pom.xml
index c7b6ac3..086e452 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1502,8 +1502,8 @@ under the License.




-   
-   
[3.0.3,)
+   
+   
[3.1.1,)





${java.version}



[flink] 02/03: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)

2019-05-22 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 774872927d12ed6662addb8b601a9140226c8625
Author: Kailash Dayanand 
AuthorDate: Wed May 22 10:40:27 2019 -0700

[FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
---
 .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index dc0b1c6..f5b1bf9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -127,7 +127,7 @@ public class StreamingFileSink
/**
 * Creates a new {@code StreamingFileSink} that writes files to the 
given base directory.
 */
-   private StreamingFileSink(
+   protected StreamingFileSink(
final StreamingFileSink.BucketsBuilder 
bucketsBuilder,
final long bucketCheckInterval) {
 
@@ -170,7 +170,7 @@ public class StreamingFileSink
/**
 * The base abstract class for the {@link RowFormatBuilder} and {@link 
BulkFormatBuilder}.
 */
-   private abstract static class BucketsBuilder implements 
Serializable {
+   protected abstract static class BucketsBuilder implements 
Serializable {
 
private static final long serialVersionUID = 1L;
 



[flink-web] branch asf-site updated (2419879 -> a47c0cf)

2019-05-22 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from 2419879  Rebuild website
 new c28aae2  [Blog] State TTL in Flink 1.8.0.
 new a47c0cf  rebuild site

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 _posts/2019-05-17-state-ttl.md| 135 +++
 content/2019/05/19/state-ttl.html | 335 ++
 content/blog/index.html   |  36 ++--
 content/blog/page2/index.html |  38 +++--
 content/blog/page3/index.html |  42 +++--
 content/blog/page4/index.html |  42 +++--
 content/blog/page5/index.html |  40 +++--
 content/blog/page6/index.html |  40 +++--
 content/blog/page7/index.html |  40 +++--
 content/blog/page8/index.html |  40 +++--
 content/blog/page9/index.html |  25 +++
 content/index.html|   6 +-
 content/zh/index.html |   6 +-
 13 files changed, 699 insertions(+), 126 deletions(-)
 create mode 100644 _posts/2019-05-17-state-ttl.md
 create mode 100644 content/2019/05/19/state-ttl.html



[flink-web] 01/02: [Blog] State TTL in Flink 1.8.0.

2019-05-22 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit c28aae2eb9d2976828596d2182ed728e48bdd4dc
Author: Marta Paes Moreira 
AuthorDate: Mon May 20 13:23:12 2019 +0200

[Blog] State TTL in Flink 1.8.0.

Submitting the markup file for a blog post by Andrey and Fabian about state 
TTL.

This closes #216
---
 _posts/2019-05-17-state-ttl.md | 135 +
 1 file changed, 135 insertions(+)

diff --git a/_posts/2019-05-17-state-ttl.md b/_posts/2019-05-17-state-ttl.md
new file mode 100644
index 000..592f180
--- /dev/null
+++ b/_posts/2019-05-17-state-ttl.md
@@ -0,0 +1,135 @@
+---
+layout: post
+title: "State TTL in Flink 1.8.0: How to Automatically Cleanup Application 
State in Apache Flink"
+date: 2019-05-19T12:00:00.000Z
+authors:
+- fabian:
+  name: "Fabian Hueske"
+  twitter: "fhueske"
+- andrey:
+  name: "Andrey Zagrebin"
+
+
+excerpt: A common requirement for many stateful streaming applications is to 
automatically cleanup application state for effective management of your state 
size, or to control how long the application state can be accessed. State TTL 
enables application state cleanup and efficient state size management in Apache 
Flink 
+---
+
+A common requirement for many stateful streaming applications is to 
automatically cleanup application state for effective management of your state 
size, or to control how long the application state can be accessed (e.g. due to 
legal regulations like the GDPR). The state time-to-live (TTL) feature was 
initiated in Flink 1.6.0 and enabled application state cleanup and efficient 
state size management in Apache Flink. 
+
+In this post, we motivate the State TTL feature and discuss its use cases. 
Moreover, we show how to use and configure it. We explain how Flink internally 
manages state with TTL and present some exciting additions to the feature in 
Flink 1.8.0. The blog post concludes with an outlook on future improvements and 
extensions.
+
+# The Transient Nature of State
+
+There are two major reasons why state should be maintained only for a limited 
time. For example, let’s imagine a Flink application that ingests a stream of 
user login events and stores for each user the time of the last login to 
improve the experience of frequent visitors.
+
+* **Controlling the size of state.**
+Being able to efficiently manage an ever-growing state size is a primary use 
case for state TTL. Oftentimes, data needs to be persisted temporarily while 
there is some user activity around it, e.g. web sessions. When the activity 
ends there is no longer interest in that data while it still occupies storage. 
Flink 1.8.0 introduces background cleanup of old state based on TTL that makes 
the eviction of no-longer-necessary data frictionless. Previously, the 
application developer had to take [...]
+
+* **Complying with data protection and sensitive data requirements.**
+Recent developments around data privacy regulations, such as the General Data 
Protection Regulation (GDPR) introduced by the European Union, make compliance 
with such data requirements or treating sensitive data a top priority for many 
use cases and applications. An example of such use cases includes applications 
that require keeping data for a specific timeframe and preventing access to it 
thereafter. This is a common challenge for companies providing short-term 
services to their custom [...]
+
+Both requirements can be addressed by a feature that periodically, yet 
continuously, removes the state for a key once it becomes unnecessary or 
unimportant and there is no requirement to keep it in storage any more.
+
+# State TTL for continuous cleanup of application state
+
+The 1.6.0 release of Apache Flink introduced the State TTL feature. It enabled 
developers of stream processing applications to configure the state of 
operators to expire and be cleaned up after a defined timeout (time-to-live). 
In Flink 1.8.0 the feature was extended, including continuous cleanup of old 
entries for both the RocksDB and the heap state backends (FSStateBackend and 
MemoryStateBackend), enabling a continuous cleanup process of old entries 
(according to the TTL setting).
+
+In Flink’s DataStream API, application state is defined by a [state 
descriptor](https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#using-managed-keyed-state).
 State TTL is configured by passing a `StateTtlConfiguration` object to a state 
descriptor. The following Java example shows how to create a state TTL 
configuration and provide it to the state descriptor that holds the last login 
time of a user as a `Long` value:
+
+```java
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+
+StateTtlConfig ttlCon

[flink-web] 02/02: rebuild site

2019-05-22 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit a47c0cff9575dd447a1c4d275de3428ecca49269
Author: Robert Metzger 
AuthorDate: Wed May 22 15:24:00 2019 +0200

rebuild site
---
 content/2019/05/19/state-ttl.html | 335 ++
 content/blog/index.html   |  36 ++--
 content/blog/page2/index.html |  38 +++--
 content/blog/page3/index.html |  42 +++--
 content/blog/page4/index.html |  42 +++--
 content/blog/page5/index.html |  40 +++--
 content/blog/page6/index.html |  40 +++--
 content/blog/page7/index.html |  40 +++--
 content/blog/page8/index.html |  40 +++--
 content/blog/page9/index.html |  25 +++
 content/index.html|   6 +-
 content/zh/index.html |   6 +-
 12 files changed, 564 insertions(+), 126 deletions(-)

diff --git a/content/2019/05/19/state-ttl.html 
b/content/2019/05/19/state-ttl.html
new file mode 100644
index 000..3c0661a
--- /dev/null
+++ b/content/2019/05/19/state-ttl.html
@@ -0,0 +1,335 @@
+
+
+  
+
+
+
+
+Apache Flink: State TTL in Flink 1.8.0: How to Automatically 
Cleanup Application State in Apache Flink
+
+
+
+
+https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css";>
+
+
+
+
+
+
+
+
+
+
+
+
+
+  
+
+
+
+
+
+
+
+  
+ 
+
+
+
+
+
+
+  
+
+
+
+  
+  
+
+  
+
+  
+
+
+
+
+  
+
+
+
+
+
+
+
+What is Apache 
Flink?
+
+
+Use Cases
+
+
+Powered By
+
+
+FAQ
+
+ 
+
+
+
+Downloads
+
+
+
+  https://ci.apache.org/projects/flink/flink-docs-release-1.8/quickstart/setup_quickstart.html";
 target="_blank">Tutorials 
+
+
+
+
+  Documentation
+  
+https://ci.apache.org/projects/flink/flink-docs-release-1.8"; 
target="_blank">1.8 (Latest stable release) 
+https://ci.apache.org/projects/flink/flink-docs-master"; 
target="_blank">1.9 (Snapshot) 
+  
+
+
+
+Getting Help
+
+
+Flink Blog
+
+ 
+
+
+
+
+Community & Project Info
+
+
+Roadmap
+
+
+How to Contribute
+
+
+
+  https://github.com/apache/flink"; target="_blank">Flink 
on GitHub 
+
+
+ 
+
+
+
+  
+ 
+  中文版   
+
+  
+
+
+  
+
+  
+  
+
+
+https://twitter.com/apacheflink"; 
target="_blank">@ApacheFlink 
+
+
+Plan Visualizer 
+
+  
+
+
+
+  
+  
+  
+  
+
+  State TTL in Flink 1.8.0: How to Automatically Cleanup Application 
State in Apache Flink
+
+  
+19 May 2019 Fabian Hueske (https://twitter.com/fhueske";>@fhueske) & Andrey Zagrebin 
+
+A common requirement for many stateful streaming applications is to 
automatically cleanup application state for effective management of your state 
size, or to control how long the application state can be accessed (e.g. due to 
legal regulations like the GDPR). The state time-to-live (TTL) feature was 
initiated in Flink 1.6.0 and enabled application state cleanup and efficient 
state size management in Apache Flink.
+
+In this post, we motivate the State TTL feature and discuss its use cases. 
Moreover, we show how to use and configure it. We explain how Flink internally 
manages state with TTL and present some exciting additions to the feature in 
Flink 1.8.0. The blog post concludes with an outlook on future improvements and 
extensions.
+
+The Transient Nature of State
+
+There are two major reasons why state should be maintained only for a 
limited time. For example, let’s imagine a Flink application that ingests a 
stream of user login events and stores for each user the time of the last login 
to improve the experience of frequent visitors.
+
+
+  
+Controlling the size of state.
+Being able to efficiently manage an ever-growing state size is a primary use 
case for state TTL. Oftentimes, data needs to be persisted temporarily while 
there is some user activity around it, e.g. web sessions. When the activity 
ends there is no longer interest in that data while it still occupies storage. 
Flink 1.8.0 introduces background cleanup of old state

[flink] branch master updated: [FLINK-12566][table] Remove row interval type

2019-05-22 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 02a0cf3  [FLINK-12566][table] Remove row interval type
02a0cf3 is described below

commit 02a0cf3d4e26f0217b87b7aaef604a6f39a56977
Author: Timo Walther 
AuthorDate: Tue May 21 11:43:28 2019 +0200

[FLINK-12566][table] Remove row interval type

This closes #8497.
---
 .../table/expressions/ApiExpressionUtils.java  |  5 ++-
 .../table/expressions/ValueLiteralExpression.java  |  3 --
 .../flink/table/typeutils/RowIntervalTypeInfo.java | 41 --
 .../codegen/agg/batch/WindowCodeGenerator.scala| 20 +--
 .../flink/table/plan/util/AggregateUtil.scala  | 33 +
 .../expressions/rules/OverWindowResolverRule.java  |  4 +--
 .../operations/AggregateOperationFactory.java  | 12 +++
 .../table/expressions/PlannerExpressionUtils.scala |  7 ++--
 .../org/apache/flink/table/expressions/call.scala  | 14 
 .../apache/flink/table/expressions/literals.scala  |  3 +-
 .../flink/table/expressions/overOffsets.scala  |  7 ++--
 .../table/runtime/aggregate/AggregateUtil.scala|  7 ++--
 .../table/api/batch/table/GroupWindowTest.scala|  8 ++---
 .../table/GroupWindowTableAggregateTest.scala  |  8 ++---
 .../table/api/stream/table/GroupWindowTest.scala   | 10 +++---
 15 files changed, 67 insertions(+), 115 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 2bbfaa2..1ef8db3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 
 import java.util.Arrays;
@@ -124,10 +123,10 @@ public final class ApiExpressionUtils {
 
public static Expression toRowInterval(Expression e) {
final Optional intInterval = 
ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
-   .map((v) -> valueLiteral((long) v, 
RowIntervalTypeInfo.INTERVAL_ROWS));
+   .map((v) -> valueLiteral((long) v, 
BasicTypeInfo.LONG_TYPE_INFO));
 
final Optional longInterval = 
ExpressionUtils.extractValue(e, BasicTypeInfo.LONG_TYPE_INFO)
-   .map((v) -> valueLiteral(v, 
RowIntervalTypeInfo.INTERVAL_ROWS));
+   .map((v) -> valueLiteral(v, 
BasicTypeInfo.LONG_TYPE_INFO));
 
if (intInterval.isPresent()) {
return intInterval.get();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
index f8138f6..186c90c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 import org.apache.flink.util.Preconditions;
 
@@ -109,8 +108,6 @@ public final class ValueLiteralExpression implements 
Expression {
return value + ".millis";
} else if (type.equals(TimeIntervalTypeInfo.INTERVAL_MONTHS)) {
return value + ".months";
-   } else if (type.equals(RowIntervalTypeInfo.INTERVAL_ROWS)) {
-   return value + ".rows";
} else {
return stringifyValue(value);
}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java
deleted file mode 100644
index a7cd362..000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/

[flink] branch release-1.7 updated: [FLINK-12578][build] Use https URL for Maven repositories

2019-05-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
 new 69b9a39  [FLINK-12578][build] Use https URL for Maven repositories
69b9a39 is described below

commit 69b9a3908cca80dc41db1d15b2968670887cff6e
Author: Jungtaek Lim 
AuthorDate: Wed May 22 19:16:02 2019 +0900

[FLINK-12578][build] Use https URL for Maven repositories

MapR repositories are excluded from this change as there appears to be some 
certificate issue on the MapR side. The latest documentation 
(https://mapr.com/docs/61/DevelopmentGuide/MavenArtifacts.html) also recommends 
using the http url.
---
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml | 2 +-
 flink-formats/flink-avro-confluent-registry/pom.xml| 2 +-
 pom.xml| 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml 
b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
index ea81997..832b243 100644
--- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
@@ -35,7 +35,7 @@ under the License.


confluent
-   http://packages.confluent.io/maven/
+   https://packages.confluent.io/maven/


 
diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml 
b/flink-formats/flink-avro-confluent-registry/pom.xml
index 9b93e35..7307772 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -32,7 +32,7 @@ under the License.


confluent
-   http://packages.confluent.io/maven/
+   https://packages.confluent.io/maven/


 
diff --git a/pom.xml b/pom.xml
index 5fd305a..d93cdd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -946,14 +946,14 @@ under the License.

HDPReleases
HDP Releases
-   
http://repo.hortonworks.com/content/repositories/releases/
+   
https://repo.hortonworks.com/content/repositories/releases/

false

true


HortonworksJettyHadoop
HDP Jetty
-   
http://repo.hortonworks.com/content/repositories/jetty-hadoop
+   
https://repo.hortonworks.com/content/repositories/jetty-hadoop

false

true




[flink] branch release-1.8 updated: [FLINK-12578][build] Use https URL for Maven repositories

2019-05-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 923ffec  [FLINK-12578][build] Use https URL for Maven repositories
923ffec is described below

commit 923ffec16b1c57b1d3d705ec7f32db5cea951225
Author: Jungtaek Lim 
AuthorDate: Wed May 22 19:16:02 2019 +0900

[FLINK-12578][build] Use https URL for Maven repositories

MapR repositories are excluded from this change as there appears to be some 
certificate issue on the MapR side. The latest documentation 
(https://mapr.com/docs/61/DevelopmentGuide/MavenArtifacts.html) also recommends 
using the http url.
---
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml | 2 +-
 flink-formats/flink-avro-confluent-registry/pom.xml| 2 +-
 pom.xml| 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml 
b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
index 0dc5869..7b61824 100644
--- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
@@ -35,7 +35,7 @@ under the License.


confluent
-   http://packages.confluent.io/maven/
+   https://packages.confluent.io/maven/


 
diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml 
b/flink-formats/flink-avro-confluent-registry/pom.xml
index 4c99f71..10dad8d 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -32,7 +32,7 @@ under the License.


confluent
-   http://packages.confluent.io/maven/
+   https://packages.confluent.io/maven/


 
diff --git a/pom.xml b/pom.xml
index 6a9f52c..21f4265 100644
--- a/pom.xml
+++ b/pom.xml
@@ -974,14 +974,14 @@ under the License.

HDPReleases
HDP Releases
-   
http://repo.hortonworks.com/content/repositories/releases/
+   
https://repo.hortonworks.com/content/repositories/releases/

false

true


HortonworksJettyHadoop
HDP Jetty
-   
http://repo.hortonworks.com/content/repositories/jetty-hadoop
+   
https://repo.hortonworks.com/content/repositories/jetty-hadoop

false

true




[flink] 03/10: [hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6b8423ee34c2c49f0ea6eb0955ced5db57dd18f
Author: Andrey Zagrebin 
AuthorDate: Sat May 11 19:49:30 2019 +0200

[hotfix][tests][network] Introduce InputChannelBuilder for creation of 
InputChannels in tests
---
 .../partition/consumer/RemoteInputChannel.java |  11 --
 .../runtime/io/network/NetworkEnvironmentTest.java |  20 +--
 .../netty/PartitionRequestClientHandlerTest.java   |  22 +---
 .../network/partition/InputChannelTestUtils.java   |  47 ++-
 .../network/partition/InputGateConcurrentTest.java |  12 +-
 .../network/partition/InputGateFairnessTest.java   |  10 +-
 .../partition/consumer/InputChannelBuilder.java| 145 +
 .../partition/consumer/LocalInputChannelTest.java  |  19 +--
 .../partition/consumer/RemoteInputChannelTest.java |  68 +++---
 .../partition/consumer/SingleInputGateTest.java| 107 ---
 10 files changed, 228 insertions(+), 233 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 87e3f62..30246c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -108,17 +108,6 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
ResultPartitionID partitionId,
ConnectionID connectionId,
ConnectionManager connectionManager,
-   InputChannelMetrics metrics) {
-
-   this(inputGate, channelIndex, partitionId, connectionId, 
connectionManager, 0, 0, metrics);
-   }
-
-   public RemoteInputChannel(
-   SingleInputGate inputGate,
-   int channelIndex,
-   ResultPartitionID partitionId,
-   ConnectionID connectionId,
-   ConnectionManager connectionManager,
int initialBackOff,
int maxBackoff,
InputChannelMetrics metrics) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a610838..81cd8f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -288,7 +287,7 @@ public class NetworkEnvironmentTest {
 * @param numberOfChannels
 *  the number of input channels
 *
-* @return input gate with some fake settings
+* @return input gate with some fake settiFngs
 */
private SingleInputGate createSingleInputGate(ResultPartitionType 
partitionType, int numberOfChannels) {
return spy(new SingleInputGateBuilder()
@@ -303,15 +302,10 @@ public class NetworkEnvironmentTest {
int channelIndex,
ResultPartition resultPartition,
ConnectionManager connManager) {
-   RemoteInputChannel channel = new RemoteInputChannel(
-   inputGate,
-   channelIndex,
-   resultPartition.getPartitionId(),
-   mock(ConnectionID.class),
-   connManager,
-   0,
-   0,
-   
InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-   
inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), 
channel);
+   InputChannelBuilder.newBuilder()
+   .setChannelIndex(channelIndex)
+   .setPartitionId(resultPartition.getPartitionId

[flink] 02/10: [hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efa7abf2c927958447d6533a3cbd5ccc105a254f
Author: Zhijiang 
AuthorDate: Fri Apr 26 18:31:33 2019 +0800

[hotfix][tests][network] Introduce ResultPartitionBuilder for creation of 
ResultPartition in tests
---
 .../runtime/io/network/NetworkEnvironmentTest.java |   9 +-
 .../io/network/partition/PartitionTestUtils.java   |  43 +++-
 .../network/partition/ResultPartitionBuilder.java  | 113 +
 .../partition/consumer/LocalInputChannelTest.java  |  32 ++
 .../StreamNetworkBenchmarkEnvironment.java |  22 ++--
 5 files changed, 148 insertions(+), 71 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index bcb4d04..a610838 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -272,8 +272,11 @@ public class NetworkEnvironmentTest {
 */
private static ResultPartition createResultPartition(
final ResultPartitionType partitionType, final int 
channels) {
-
-   return PartitionTestUtils.createPartition(partitionType, 
channels);
+   return new ResultPartitionBuilder()
+   .setResultPartitionType(partitionType)
+   .setNumberOfSubpartitions(channels)
+   .setNumTargetKeyGroups(channels)
+   .build();
}
 
/**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 3391030..b52bffe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
-
 /**
  * This class should consolidate all mocking logic for ResultPartitions.
  * While using Mockito internally (for now), the use of Mockito should not
@@ -34,41 +30,26 @@ public class PartitionTestUtils {
}
 
public static ResultPartition createPartition(ResultPartitionType type) 
{
-   return createPartition(
-   new NoOpResultPartitionConsumableNotifier(),
-   type,
-   false);
-   }
-
-   public static ResultPartition createPartition(ResultPartitionType type, 
int numChannels) {
-   return createPartition(new 
NoOpResultPartitionConsumableNotifier(), type, numChannels, false);
+   return new 
ResultPartitionBuilder().setResultPartitionType(type).build();
}
 
public static ResultPartition createPartition(
ResultPartitionConsumableNotifier notifier,
ResultPartitionType type,
boolean sendScheduleOrUpdateConsumersMessage) {
-
-   return createPartition(notifier, type, 1, 
sendScheduleOrUpdateConsumersMessage);
+   return new ResultPartitionBuilder()
+   .setResultPartitionConsumableNotifier(notifier)
+   .setResultPartitionType(type)
+   
.setSendScheduleOrUpdateConsumersMessage(sendScheduleOrUpdateConsumersMessage)
+   .build();
}
 
public static ResultPartition createPartition(
-   ResultPartitionConsumableNotifier notifier,
-   ResultPartitionType type,
-   int numChannels,
-   boolean sendScheduleOrUpdateConsumersMessage) {
-
-   return new ResultP

[flink] 10/10: [FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup()

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d81425de91ecbab715f8a7d169e0a7796d19c51
Author: Andrey Zagrebin 
AuthorDate: Fri May 10 22:14:07 2019 +0200

[FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to 
InputGate#setup()

Move input gate setup from NetworkEnvironment to InputGate.
This eliminates tie between Task and NetworkEnvironment.
Task does not need depend on NetworkEnvironment
and can trigger setup from InputGate directly.
---
 .../runtime/io/network/NetworkEnvironment.java | 57 --
 .../io/network/partition/consumer/InputGate.java   |  5 ++
 .../partition/consumer/SingleInputGate.java| 19 +++-
 .../partition/consumer/SingleInputGateFactory.java | 55 ++---
 .../network/partition/consumer/UnionInputGate.java |  4 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 28 ++-
 .../runtime/io/network/NetworkEnvironmentTest.java | 49 +--
 .../network/partition/InputGateFairnessTest.java   |  8 ++-
 .../network/partition/ResultPartitionBuilder.java  |  2 +-
 .../partition/consumer/InputGateTestBase.java  | 16 --
 .../partition/consumer/SingleInputGateBuilder.java | 25 +-
 .../partition/consumer/SingleInputGateTest.java| 16 +++---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 52 ++--
 .../runtime/io/BarrierBufferMassiveRandomTest.java |  4 ++
 .../flink/streaming/runtime/io/MockInputGate.java  |  4 ++
 .../StreamNetworkBenchmarkEnvironment.java |  2 +-
 16 files changed, 219 insertions(+), 127 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index de669bb..1f2ee7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge;
@@ -47,9 +46,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -190,60 +187,6 @@ public class NetworkEnvironment {
return config;
}
 
-   // 

-   //  Task operations
-   // 

-
-   public void registerTask(Task task) throws IOException {
-   final ResultPartition[] producedPartitions = 
task.getProducedPartitions();
-
-   synchronized (lock) {
-   if (isShutdown) {
-   throw new 
IllegalStateException("NetworkEnvironment is shut down");
-   }
-
-   for (final ResultPartition partition : 
producedPartitions) {
-   partition.setup();
-   }
-
-   // Setup the buffer pool for each buffer reader
-   final SingleInputGate[] inputGates = 
task.getAllInputGates();
-   for (SingleInputGate gate : inputGates) {
-   setupInputGate(gate);
-   }
-   }
-   }
-
-   @VisibleForTesting
-   public void setupInputGate(SingleInputGate gate) throws IOException {
-   BufferPool bufferPool = null;
-   int maxNumberOfMemorySegments;
-   try {
-   if (config.isCreditBased()) {
-   maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   config.floatingNetworkBuffersPerGate() 
: Integer.MAX_VALUE;
-
-

[flink] 08/10: [hotfix][network] Move MemorySegmentProvider from SingleInputGate to RemoteInputChannel

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8589c82b4cbc8a73417974107186f9ccc3a629d3
Author: Andrey Zagrebin 
AuthorDate: Fri May 10 20:25:59 2019 +0200

[hotfix][network] Move MemorySegmentProvider from SingleInputGate to 
RemoteInputChannel
---
 .../runtime/io/network/NetworkEnvironment.java |  6 +-
 .../partition/consumer/RemoteInputChannel.java | 21 +--
 .../partition/consumer/SingleInputGate.java| 33 ++
 .../partition/consumer/SingleInputGateFactory.java | 14 -
 .../partition/consumer/UnknownInputChannel.java| 13 +++-
 .../runtime/io/network/NetworkEnvironmentTest.java | 41 +++--
 ...editBasedPartitionRequestClientHandlerTest.java | 21 ---
 .../netty/PartitionRequestClientHandlerTest.java   | 54 ++--
 .../network/netty/PartitionRequestClientTest.java  | 17 --
 .../network/partition/InputChannelTestUtils.java   | 71 ++
 .../partition/consumer/InputChannelBuilder.java| 14 -
 .../partition/consumer/RemoteInputChannelTest.java | 63 +--
 .../partition/consumer/SingleInputGateTest.java|  3 +-
 .../StreamNetworkBenchmarkEnvironment.java |  3 +-
 14 files changed, 225 insertions(+), 149 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 97016ef..459669c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -138,8 +138,8 @@ public class NetworkEnvironment {
ResultPartitionFactory resultPartitionFactory =
new ResultPartitionFactory(resultPartitionManager, 
ioManager);
 
-   SingleInputGateFactory singleInputGateFactory =
-   new SingleInputGateFactory(config, connectionManager, 
resultPartitionManager, taskEventPublisher);
+   SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
+   config, connectionManager, resultPartitionManager, 
taskEventPublisher, networkBufferPool);
 
return new NetworkEnvironment(
config,
@@ -247,7 +247,7 @@ public class NetworkEnvironment {
config.floatingNetworkBuffersPerGate() 
: Integer.MAX_VALUE;
 
// assign exclusive buffers to input channels 
directly and use the rest for floating buffers
-   gate.assignExclusiveSegments(networkBufferPool);
+   gate.assignExclusiveSegments();
bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
} else {
maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 98182c6..397c4fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -34,6 +35,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.util.ExceptionUtils;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -103,6 +105,10 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
@GuardedBy("bufferQueue")
private boolean isWaitingForFloatingBuffers;
 
+   /** Global memory segment provider to request and recycle exclusive 
buffers (only for credit-based). */
+   @Nonnull
+   private final MemorySegmentProvider memorySegmentProvider;
+
public RemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
@@ -111,23 +117,26 @@ public class RemoteInputChannel extends InputChannel 
implements Buf

[flink] 09/10: [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e99eee4f0bf9b2d851fa12ea52007e8f083c366f
Author: Andrey Zagrebin 
AuthorDate: Fri May 10 20:56:25 2019 +0200

[FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to 
ResultPartitionWriter#setup()

Move partition setup from NetworkEnvironment to ResultPartition.
This eliminates tie between Task and NetworkEnvironment.
Task does not need depend on NetworkEnvironment
and can trigger setup from ResultPartitionWriter interface.
---
 .../runtime/io/network/NetworkEnvironment.java | 47 +---
 .../network/api/writer/ResultPartitionWriter.java  |  5 ++
 .../io/network/partition/ResultPartition.java  | 22 +---
 .../network/partition/ResultPartitionFactory.java  | 49 +++--
 .../runtime/io/network/NetworkEnvironmentTest.java | 39 --
 .../AbstractCollectingResultPartitionWriter.java   |  4 ++
 .../io/network/api/writer/RecordWriterTest.java|  8 +++
 .../io/network/partition/PartitionTestUtils.java   |  4 ++
 .../network/partition/ResultPartitionBuilder.java  | 63 --
 .../io/network/partition/ResultPartitionTest.java  |  5 +-
 .../partition/consumer/LocalInputChannelTest.java  |  9 ++--
 .../StreamNetworkBenchmarkEnvironment.java |  3 +-
 12 files changed, 168 insertions(+), 90 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 459669c..de669bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -57,7 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -135,11 +134,19 @@ public class NetworkEnvironment {
registerNetworkMetrics(metricGroup, networkBufferPool);
 
ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
-   ResultPartitionFactory resultPartitionFactory =
-   new ResultPartitionFactory(resultPartitionManager, 
ioManager);
+   ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
+   resultPartitionManager,
+   ioManager,
+   networkBufferPool,
+   config.networkBuffersPerChannel(),
+   config.floatingNetworkBuffersPerGate());
 
SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
-   config, connectionManager, resultPartitionManager, 
taskEventPublisher, networkBufferPool);
+   config,
+   connectionManager,
+   resultPartitionManager,
+   taskEventPublisher,
+   networkBufferPool);
 
return new NetworkEnvironment(
config,
@@ -196,7 +203,7 @@ public class NetworkEnvironment {
}
 
for (final ResultPartition partition : 
producedPartitions) {
-   setupPartition(partition);
+   partition.setup();
}
 
// Setup the buffer pool for each buffer reader
@@ -208,36 +215,6 @@ public class NetworkEnvironment {
}
 
@VisibleForTesting
-   public void setupPartition(ResultPartition partition) throws 
IOException {
-   BufferPool bufferPool = null;
-
-   try {
-   int maxNumberOfMemorySegments = 
partition.getPartitionType().isBounded() ?
-   partition.getNumberOfSubpartitions() * 
config.networkBuffersPerChannel() +
-   config.floatingNetworkBuffersPerGate() 
: Integer.MAX_VALUE;
-   // If the partition type is back pressure-free, we 
register with the buffer pool for
-   // callbacks to release memory.
-   bufferPool = 
networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
-   maxNumberOfMemorySegments,
-   partition.getPartitionType().hasBackPressure() 
? Optional.empty() : Optional.of(partition));
-
-   partition.registerBufferPool(bufferPool);
-
-   
resultPartitionManager.registerResultPartition(partition);
-   } catch (Throwable t) {
-   if (bufferPool != null) {
-

[flink] 04/10: [hotfix][tests][network] Introduce NetworkEnvironment.create factory method

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e677bb6dcc38dfb3b5bf8b20e4a2d324b9e20ab5
Author: Andrey Zagrebin 
AuthorDate: Sun May 12 12:55:01 2019 +0200

[hotfix][tests][network] Introduce NetworkEnvironment.create factory method
---
 .../runtime/io/network/NetworkEnvironment.java | 47 +++---
 .../runtime/taskexecutor/TaskManagerServices.java  |  2 +-
 .../io/network/NetworkEnvironmentBuilder.java  |  2 +-
 3 files changed, 35 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b9b35c9..ea482f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -92,31 +92,50 @@ public class NetworkEnvironment {
 
private boolean isShutdown;
 
-   public NetworkEnvironment(
+   private NetworkEnvironment(
NetworkEnvironmentConfiguration config,
+   NetworkBufferPool networkBufferPool,
+   ConnectionManager connectionManager,
+   ResultPartitionManager resultPartitionManager,
TaskEventPublisher taskEventPublisher,
-   MetricGroup metricGroup,
IOManager ioManager) {
-   this.config = checkNotNull(config);
+   this.config = config;
+   this.networkBufferPool = networkBufferPool;
+   this.connectionManager = connectionManager;
+   this.resultPartitionManager = resultPartitionManager;
+   this.taskEventPublisher = taskEventPublisher;
+   this.ioManager = ioManager;
+   this.isShutdown = false;
+   }
 
-   this.networkBufferPool = new 
NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+   public static NetworkEnvironment create(
+   NetworkEnvironmentConfiguration config,
+   TaskEventPublisher taskEventPublisher,
+   MetricGroup metricGroup,
+   IOManager ioManager) {
+   checkNotNull(ioManager);
+   checkNotNull(taskEventPublisher);
+   checkNotNull(config);
 
NettyConfig nettyConfig = config.nettyConfig();
-   if (nettyConfig != null) {
-   this.connectionManager = new 
NettyConnectionManager(nettyConfig, config.isCreditBased());
-   } else {
-   this.connectionManager = new LocalConnectionManager();
-   }
-
-   this.resultPartitionManager = new ResultPartitionManager();
+   ConnectionManager connectionManager = nettyConfig != null ?
+   new NettyConnectionManager(nettyConfig, 
config.isCreditBased()) : new LocalConnectionManager();
 
-   this.taskEventPublisher = checkNotNull(taskEventPublisher);
+   NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+   config.numNetworkBuffers(),
+   config.networkBufferSize());
 
registerNetworkMetrics(metricGroup, networkBufferPool);
 
-   this.ioManager = checkNotNull(ioManager);
+   ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
 
-   isShutdown = false;
+   return new NetworkEnvironment(
+   config,
+   networkBufferPool,
+   connectionManager,
+   resultPartitionManager,
+   taskEventPublisher,
+   ioManager);
}
 
private static void registerNetworkMetrics(MetricGroup metricGroup, 
NetworkBufferPool networkBufferPool) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e2ec942..e19e8fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -246,7 +246,7 @@ public class TaskManagerServices {
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-   final NetworkEnvironment network = new NetworkEnvironment(
+   final NetworkEnvironment network = NetworkEnvironment.create(
   

[flink] 07/10: [hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2375693f1cc80118bf61922f027942a0f101c486
Author: Andrey Zagrebin 
AuthorDate: Fri May 10 19:01:31 2019 +0200

[hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to 
assign exclusive segments
---
 .../flink/core/memory/MemorySegmentProvider.java   | 31 +++
 .../runtime/io/network/NetworkEnvironment.java |  5 +-
 .../io/network/buffer/NetworkBufferPool.java   | 35 +++-
 .../partition/consumer/RemoteInputChannel.java |  5 +-
 .../partition/consumer/SingleInputGate.java| 29 --
 .../runtime/io/network/NetworkEnvironmentTest.java | 16 +++---
 .../io/network/buffer/BufferPoolFactoryTest.java   | 12 ++--
 .../network/buffer/LocalBufferPoolDestroyTest.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java |  2 +-
 .../io/network/buffer/NetworkBufferPoolTest.java   | 65 ++
 ...editBasedPartitionRequestClientHandlerTest.java | 15 ++---
 .../netty/PartitionRequestClientHandlerTest.java   |  5 +-
 .../network/netty/PartitionRequestClientTest.java  | 12 ++--
 .../partition/consumer/LocalInputChannelTest.java  |  2 +-
 .../partition/consumer/RemoteInputChannelTest.java | 40 ++---
 .../BackPressureStatsTrackerImplITCase.java|  2 +-
 .../runtime/io/BarrierBufferMassiveRandomTest.java |  4 +-
 17 files changed, 150 insertions(+), 132 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
new file mode 100644
index 000..83b2d19
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * The provider used for requesting and releasing batch of memory segments.
+ */
+public interface MemorySegmentProvider {
+   Collection requestMemorySegments() throws IOException;
+
+   void recycleMemorySegments(Collection segments) throws 
IOException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d0e8263..97016ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -129,7 +129,8 @@ public class NetworkEnvironment {
 
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
config.numNetworkBuffers(),
-   config.networkBufferSize());
+   config.networkBufferSize(),
+   config.networkBuffersPerChannel());
 
registerNetworkMetrics(metricGroup, networkBufferPool);
 
@@ -246,7 +247,7 @@ public class NetworkEnvironment {
config.floatingNetworkBuffersPerGate() 
: Integer.MAX_VALUE;
 
// assign exclusive buffers to input channels 
directly and use the rest for floating buffers
-   gate.assignExclusiveSegments(networkBufferPool, 
config.networkBuffersPerChannel());
+   gate.assignExclusiveSegments(networkBufferPool);
bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
} else {
maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 48ce27e..3d6c2da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java

[flink] branch master updated (e16fa9f -> 3d81425)

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e16fa9f  [FLINK-12439][python] Add FileSystem Connector with CSV 
format support in Python Table API
 new 15811a5  [hotfix][tests][network] Introduce SingleInputGateBuilder for 
creation of SingleInputGate in tests
 new efa7abf  [hotfix][tests][network] Introduce ResultPartitionBuilder for 
creation of ResultPartition in tests
 new e6b8423  [hotfix][tests][network] Introduce InputChannelBuilder for 
creation of InputChannels in tests
 new e677bb6  [hotfix][tests][network] Introduce NetworkEnvironment.create 
factory method
 new e546009  [hotfix][network] Introduce ResultPartitionFactory
 new c00d879  [hotfix][network] Introduce SingleInputGateFactory
 new 2375693  [hotfix][network] Introduce MemorySegmentProvider for 
RemoteInputChannel to assign exclusive segments
 new 8589c82  [hotfix][network] Move MemorySegmentProvider from 
SingleInputGate to RemoteInputChannel
 new e99eee4  [FLINK-12331][network] Refactor 
NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()
 new 3d81425  [FLINK-12331][network] Refactor 
NetworkEnvironment#setupInputGate() to InputGate#setup()

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...{MemoryType.java => MemorySegmentProvider.java} |  19 +-
 .../runtime/io/network/NetworkEnvironment.java | 179 ++
 .../network/api/writer/ResultPartitionWriter.java  |   5 +
 .../io/network/buffer/NetworkBufferPool.java   |  35 ++--
 .../io/network/partition/ResultPartition.java  |  82 ++--
 .../network/partition/ResultPartitionFactory.java  | 201 
 .../io/network/partition/consumer/InputGate.java   |   5 +
 .../partition/consumer/RemoteInputChannel.java |  33 ++--
 .../partition/consumer/SingleInputGate.java| 153 +++
 .../partition/consumer/SingleInputGateFactory.java | 207 +
 .../network/partition/consumer/UnionInputGate.java |   4 +
 .../partition/consumer/UnknownInputChannel.java|  13 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |   2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  28 +--
 .../io/network/NetworkEnvironmentBuilder.java  |   2 +-
 .../runtime/io/network/NetworkEnvironmentTest.java | 153 +++
 .../AbstractCollectingResultPartitionWriter.java   |   4 +
 .../io/network/api/writer/RecordWriterTest.java|   8 +
 .../io/network/buffer/BufferPoolFactoryTest.java   |  12 +-
 .../network/buffer/LocalBufferPoolDestroyTest.java |   2 +-
 .../io/network/buffer/LocalBufferPoolTest.java |   2 +-
 .../io/network/buffer/NetworkBufferPoolTest.java   |  65 +++
 ...editBasedPartitionRequestClientHandlerTest.java |  30 +--
 .../netty/PartitionRequestClientHandlerTest.java   |  67 +--
 .../network/netty/PartitionRequestClientTest.java  |  25 ++-
 .../network/partition/InputChannelTestUtils.java   | 140 --
 .../network/partition/InputGateConcurrentTest.java |  12 +-
 .../network/partition/InputGateFairnessTest.java   |  18 +-
 .../io/network/partition/PartitionTestUtils.java   |  45 ++---
 .../network/partition/ResultPartitionBuilder.java  | 165 
 .../io/network/partition/ResultPartitionTest.java  |   5 +-
 .../partition/consumer/InputChannelBuilder.java| 155 +++
 .../partition/consumer/InputGateTestBase.java  |  20 +-
 .../partition/consumer/LocalInputChannelTest.java  |  78 +++-
 .../partition/consumer/RemoteInputChannelTest.java | 151 +++
 .../partition/consumer/SingleInputGateBuilder.java | 105 +++
 .../partition/consumer/SingleInputGateTest.java| 143 +-
 .../BackPressureStatsTrackerImplITCase.java|   2 +-
 .../apache/flink/runtime/taskmanager/TaskTest.java |  52 +-
 .../runtime/io/BarrierBufferMassiveRandomTest.java |   8 +-
 .../flink/streaming/runtime/io/MockInputGate.java  |   4 +
 .../StreamNetworkBenchmarkEnvironment.java |  50 ++---
 42 files changed, 1517 insertions(+), 972 deletions(-)
 copy flink-core/src/main/java/org/apache/flink/core/memory/{MemoryType.java => 
MemorySegmentProvider.java} (72%)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.ja

[flink] 01/10: [hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 15811a5d8c7362ac64828b771951dfa8304d98eb
Author: Zhijiang 
AuthorDate: Fri Apr 26 17:13:08 2019 +0800

[hotfix][tests][network] Introduce SingleInputGateBuilder for creation of 
SingleInputGate in tests
---
 .../runtime/io/network/NetworkEnvironmentTest.java |  7 +-
 .../network/partition/InputChannelTestUtils.java   | 24 +--
 .../partition/consumer/InputGateTestBase.java  | 10 +--
 .../partition/consumer/LocalInputChannelTest.java  | 16 ++---
 .../partition/consumer/SingleInputGateBuilder.java | 82 ++
 5 files changed, 99 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f20feb7..bcb4d04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import org.junit.Rule;
@@ -287,7 +288,11 @@ public class NetworkEnvironmentTest {
 * @return input gate with some fake settings
 */
private SingleInputGate createSingleInputGate(ResultPartitionType 
partitionType, int numberOfChannels) {
-   return 
spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, 
partitionType, enableCreditBasedFlowControl));
+   return spy(new SingleInputGateBuilder()
+   .setNumberOfChannels(numberOfChannels)
+   .setResultPartitionType(partitionType)
+   .setIsCreditBased(enableCreditBasedFlowControl)
+   .build());
}
 
private static void createRemoteInputChannel(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 4feee4e..82b4c5b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -28,9 +26,8 @@ import 
org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -75,24 +72,7 @@ public class InputChannelTestUtils {
}
 
public static SingleInputGate createSingleInputGate(int 
numberOfChannels) {
-   return createSingleInputGate(numberOfChannels, 
ResultPartitionType.PIPELINED, true);
-   }
-
-   public static SingleInputGate createSingleInputGate(
-   int numberOfChannels,
-   ResultPartitionType partitionType,
-   boolean isCreditBased) {
-
-   return new SingleInputGate(
-   "InputGate",
-   new JobID(),
-   new IntermediateDataSetID(),
-   partitionType,
-   0,
-   numberOfChannels,
-   new NoOpTaskActions(),
-   new SimpleCounter(),
-   isCreditBased);
+   return new 
SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).build();
}
 
public static ConnectionManager createDummy

[flink] 05/10: [hotfix][network] Introduce ResultPartitionFactory

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e546009b7360c341d74b53c5d805e84f6276a897
Author: Andrey Zagrebin 
AuthorDate: Fri May 10 17:33:00 2019 +0200

[hotfix][network] Introduce ResultPartitionFactory
---
 .../runtime/io/network/NetworkEnvironment.java |  25 ++--
 .../io/network/partition/ResultPartition.java  |  64 +---
 .../network/partition/ResultPartitionFactory.java  | 162 +
 .../network/partition/ResultPartitionBuilder.java  |   9 +-
 4 files changed, 176 insertions(+), 84 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index ea482f1..7974e83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -88,7 +89,7 @@ public class NetworkEnvironment {
 
private final TaskEventPublisher taskEventPublisher;
 
-   private final IOManager ioManager;
+   private final ResultPartitionFactory resultPartitionFactory;
 
private boolean isShutdown;
 
@@ -98,14 +99,14 @@ public class NetworkEnvironment {
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
TaskEventPublisher taskEventPublisher,
-   IOManager ioManager) {
+   ResultPartitionFactory resultPartitionFactory) {
this.config = config;
this.networkBufferPool = networkBufferPool;
this.connectionManager = connectionManager;
this.resultPartitionManager = resultPartitionManager;
this.taskEventPublisher = taskEventPublisher;
-   this.ioManager = ioManager;
this.isShutdown = false;
+   this.resultPartitionFactory = resultPartitionFactory;
}
 
public static NetworkEnvironment create(
@@ -128,6 +129,8 @@ public class NetworkEnvironment {
registerNetworkMetrics(metricGroup, networkBufferPool);
 
ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+   ResultPartitionFactory resultPartitionFactory =
+   new ResultPartitionFactory(resultPartitionManager, 
ioManager);
 
return new NetworkEnvironment(
config,
@@ -135,7 +138,7 @@ public class NetworkEnvironment {
connectionManager,
resultPartitionManager,
taskEventPublisher,
-   ioManager);
+   resultPartitionFactory);
}
 
private static void registerNetworkMetrics(MetricGroup metricGroup, 
NetworkBufferPool networkBufferPool) {
@@ -283,18 +286,8 @@ public class NetworkEnvironment {
ResultPartition[] resultPartitions = new 
ResultPartition[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
for (ResultPartitionDeploymentDescriptor rpdd : 
resultPartitionDeploymentDescriptors) {
-   resultPartitions[counter++] = new 
ResultPartition(
-   taskName,
-   taskActions,
-   jobId,
-   new 
ResultPartitionID(rpdd.getPartitionId(), executionId),
-   rpdd.getPartitionType(),
-   rpdd.getNumberOfSubpartitions(),
-   rpdd.getMaxParallelism(),
-   resultPartitionManager,
-   partitionConsumableNotifier,
-   ioManager,
-   
rpdd.sendScheduleOrUpdateConsumersMessage());
+   resultPartitions[counter++] = 
resultPartitionFactory.create(
+   taskN

[flink] 06/10: [hotfix][network] Introduce SingleInputGateFactory

2019-05-22 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c00d87969d6956a9931173e2bd2923493e33f676
Author: Andrey Zagrebin 
AuthorDate: Fri May 10 17:39:28 2019 +0200

[hotfix][network] Introduce SingleInputGateFactory
---
 .../runtime/io/network/NetworkEnvironment.java |  20 ++-
 .../partition/consumer/SingleInputGate.java|  96 -
 .../partition/consumer/SingleInputGateFactory.java | 158 +
 .../partition/consumer/SingleInputGateTest.java|  21 +--
 .../StreamNetworkBenchmarkEnvironment.java |  22 +--
 5 files changed, 197 insertions(+), 120 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 7974e83..d0e8263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -91,6 +92,8 @@ public class NetworkEnvironment {
 
private final ResultPartitionFactory resultPartitionFactory;
 
+   private final SingleInputGateFactory singleInputGateFactory;
+
private boolean isShutdown;
 
private NetworkEnvironment(
@@ -99,14 +102,16 @@ public class NetworkEnvironment {
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
TaskEventPublisher taskEventPublisher,
-   ResultPartitionFactory resultPartitionFactory) {
+   ResultPartitionFactory resultPartitionFactory,
+   SingleInputGateFactory singleInputGateFactory) {
this.config = config;
this.networkBufferPool = networkBufferPool;
this.connectionManager = connectionManager;
this.resultPartitionManager = resultPartitionManager;
this.taskEventPublisher = taskEventPublisher;
-   this.isShutdown = false;
this.resultPartitionFactory = resultPartitionFactory;
+   this.singleInputGateFactory = singleInputGateFactory;
+   this.isShutdown = false;
}
 
public static NetworkEnvironment create(
@@ -132,13 +137,17 @@ public class NetworkEnvironment {
ResultPartitionFactory resultPartitionFactory =
new ResultPartitionFactory(resultPartitionManager, 
ioManager);
 
+   SingleInputGateFactory singleInputGateFactory =
+   new SingleInputGateFactory(config, connectionManager, 
resultPartitionManager, taskEventPublisher);
+
return new NetworkEnvironment(
config,
networkBufferPool,
connectionManager,
resultPartitionManager,
taskEventPublisher,
-   resultPartitionFactory);
+   resultPartitionFactory,
+   singleInputGateFactory);
}
 
private static void registerNetworkMetrics(MetricGroup metricGroup, 
NetworkBufferPool networkBufferPool) {
@@ -168,6 +177,7 @@ public class NetworkEnvironment {
return networkBufferPool;
}
 
+   @VisibleForTesting
public NetworkEnvironmentConfiguration getConfiguration() {
return config;
}
@@ -311,12 +321,10 @@ public class NetworkEnvironment {
SingleInputGate[] inputGates = new 
SingleInputGate[inputGateDeploymentDescriptors.size()];
int counter = 0;
for (InputGateDeploymentDescriptor igdd : 
inputGateDeploymentDescriptors) {
-   inputGates[counter++] = SingleInputGate.create(
+   inputGates[counter++] = 
singleInputGateFactory.create(
taskName,
jobId,
igdd,
-   this,
-   taskEventPublisher,
   

[flink] branch master updated: [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API

2019-05-22 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e16fa9f  [FLINK-12439][python] Add FileSystem Connector with CSV 
format support in Python Table API
e16fa9f is described below

commit e16fa9fe1506ec725f5c5abb2b24afa246e17dae
Author: Wei Zhong 
AuthorDate: Tue May 21 16:25:16 2019 +0800

[FLINK-12439][python] Add FileSystem Connector with CSV format support in 
Python Table API

Brief change log:
 - Add all of the existing descriptor interfaces align Java Table API.
 - Add FileSystem connector and OldCsv format support.
 - The `schema(..)` of OldCsv will be added in FLINK-12588.

This closes #8488
---
 flink-python/pyflink/java_gateway.py   |   1 +
 flink-python/pyflink/table/__init__.py |   5 +
 flink-python/pyflink/table/table_descriptor.py | 510 +
 flink-python/pyflink/table/table_environment.py|  91 +++
 .../pyflink/table/tests/test_descriptor.py | 618 +
 .../table/tests/test_environment_completeness.py   |   2 +-
 6 files changed, 1226 insertions(+), 1 deletion(-)

diff --git a/flink-python/pyflink/java_gateway.py 
b/flink-python/pyflink/java_gateway.py
index ed1dc89..e5c8330 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -101,6 +101,7 @@ def launch_gateway():
 java_import(gateway.jvm, "org.apache.flink.table.api.*")
 java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
 java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
+java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
 java_import(gateway.jvm, "org.apache.flink.table.sources.*")
 java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
 java_import(gateway.jvm, 
"org.apache.flink.api.common.typeinfo.TypeInformation")
diff --git a/flink-python/pyflink/table/__init__.py 
b/flink-python/pyflink/table/__init__.py
index dcbc0ab..4ea3b3f 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -40,6 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink
 from pyflink.table.table_source import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes
 from pyflink.table.window import Tumble, Session, Slide, Over
+from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem
 
 __all__ = [
 'TableEnvironment',
@@ -56,4 +57,8 @@ __all__ = [
 'Session',
 'Slide',
 'Over',
+'Rowtime',
+'Schema',
+'OldCsv',
+'FileSystem',
 ]
diff --git a/flink-python/pyflink/table/table_descriptor.py 
b/flink-python/pyflink/table/table_descriptor.py
new file mode 100644
index 000..ed9a157
--- /dev/null
+++ b/flink-python/pyflink/table/table_descriptor.py
@@ -0,0 +1,510 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+from abc import ABCMeta
+
+from py4j.java_gateway import get_method
+
+from pyflink.java_gateway import get_gateway
+from pyflink.util.type_utils import to_java_type
+
+if sys.version >= '3':
+unicode = str
+
+__all__ = [
+'Rowtime',
+'Schema',
+'OldCsv',
+'FileSystem'
+]
+
+
+class Descriptor(object):
+"""
+Base class of the descriptors that adds a set of string-based, normalized 
properties for
+describing DDL information.
+
+Typical characteristics of a descriptor are:
+- descriptors have a default constructor
+- descriptors themselves contain very little logic
+- corresponding validators validate the correctness (goal: have a single 
point of validation)
+
+A descriptor is similar to a builder in a builder pattern, thus, mutable 
for building
+properties.
+"""
+
+__metaclass__ = ABCMeta
+
+def __init__(self, j_descriptor):
+self._j_descriptor = j_descriptor
+
+def to_properties(self):
+"""
+Converts 

[flink] branch master updated: [FLINK-12578][build] Use https URL for Maven repositories

2019-05-22 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 65477d3  [FLINK-12578][build] Use https URL for Maven repositories
65477d3 is described below

commit 65477d365330751cd55db1c3b18e1fc9bd90d321
Author: Jungtaek Lim 
AuthorDate: Wed May 22 19:16:02 2019 +0900

[FLINK-12578][build] Use https URL for Maven repositories

MapR repositories are excluded from this change as there appears to be some 
certificate issue on the MapR side. The latest documentation 
(https://mapr.com/docs/61/DevelopmentGuide/MavenArtifacts.html) also recommends 
using the http url.
---
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml | 2 +-
 flink-formats/flink-avro-confluent-registry/pom.xml| 2 +-
 pom.xml| 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml 
b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
index 71ffb98..12d4950 100644
--- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml
@@ -35,7 +35,7 @@ under the License.


confluent
-   http://packages.confluent.io/maven/
+   https://packages.confluent.io/maven/


 
diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml 
b/flink-formats/flink-avro-confluent-registry/pom.xml
index 9c89167..aa6382f 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -32,7 +32,7 @@ under the License.


confluent
-   http://packages.confluent.io/maven/
+   https://packages.confluent.io/maven/


 
diff --git a/pom.xml b/pom.xml
index ed9ae64..c7b6ac3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1017,14 +1017,14 @@ under the License.

HDPReleases
HDP Releases
-   
http://repo.hortonworks.com/content/repositories/releases/
+   
https://repo.hortonworks.com/content/repositories/releases/

false

true


HortonworksJettyHadoop
HDP Jetty
-   
http://repo.hortonworks.com/content/repositories/jetty-hadoop
+   
https://repo.hortonworks.com/content/repositories/jetty-hadoop

false

true




[flink] 01/02: [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop.

2019-05-22 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 022f6cceef65859bc6f172151d09140038297f69
Author: Stefan Richter 
AuthorDate: Fri May 10 11:20:02 2019 +0200

[FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask 
main-loop.

This closes #8409.
This closes #8431.

This also decomposes monolithic run-loops in StreamTask implementations 
into step-wise calls.
---
 .../runtime/tasks/OneInputStreamTask.java  |  12 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |   5 +-
 .../runtime/tasks/StreamIterationHead.java | 105 -
 .../flink/streaming/runtime/tasks/StreamTask.java  |  82 ++-
 .../runtime/tasks/TwoInputStreamTask.java  |  13 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  36 
 .../runtime/tasks/mailbox/MailboxImpl.java | 236 +
 .../runtime/tasks/mailbox/MailboxReceiver.java |  59 ++
 .../runtime/tasks/mailbox/MailboxSender.java   |  52 +
 ...heckpointExceptionHandlerConfigurationTest.java |   4 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   4 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|  21 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   3 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |   4 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java | 170 +++
 .../flink/streaming/util/MockStreamTask.java   |   4 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java|  10 +-
 19 files changed, 726 insertions(+), 100 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 7498518..7b82d8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -39,8 +39,6 @@ public class OneInputStreamTask extends 
StreamTask inputProcessor;
 
-   private volatile boolean running = true;
-
private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
 
/**
@@ -98,12 +96,9 @@ public class OneInputStreamTask extends 
StreamTask inputProcessor = 
this.inputProcessor;
-
-   while (running && inputProcessor.processInput()) {
-   // all the work happens in the "processInput" method
+   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   if (!inputProcessor.processInput()) {
+   context.allActionsCompleted();
}
}
 
@@ -116,6 +111,5 @@ public class OneInputStreamTask extends 
StreamTask, OP extends S
}
 
@Override
-   protected void run() throws Exception {
+   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
+   // compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+   context.allActionsCompleted();
}
 
@Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index ecef7f0..d25bd23 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,88 +42,72 @@ public class StreamIterationHead extends 
OneInputStreamTask {
 
private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
-   private volatile boolean running = true;
+   private RecordWriterOutput[] streamOutputs;
+
+   private final BlockingQueue> dataChannel;
+   private final String brokerID;
+   private final long iterationWaitTime;
+   private final boolean shouldWait;
 
public StreamIterationHead(E

[flink] 02/02: [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.

2019-05-22 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845
Author: Stefan Richter 
AuthorDate: Tue May 14 15:33:48 2019 +0200

[FLINK-12483][runtime] Support (legacy) SourceFunction as special case in 
the mailbox model for stream tasks.

This closes #8442.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 66 +-
 1 file changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fd50a1a..e604f2c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
 public class SourceStreamTask, OP extends 
StreamSource>
extends StreamTask {
 
+   private static final Runnable SOURCE_POISON_LETTER = () -> {};
+
private volatile boolean externallyInducedCheckpoints;
 
public SourceStreamTask(Environment env) {
@@ -101,12 +103,43 @@ public class SourceStreamTask, OP extends S
protected void performDefaultAction(ActionContext context) throws 
Exception {
// Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
// compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
-   headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+   final LegacySourceFunctionThread sourceThread = new 
LegacySourceFunctionThread();
+   sourceThread.start();
+
+   // We run an alternative mailbox loop that does not involve 
default actions and synchronizes around actions.
+   try {
+   runAlternativeMailboxLoop();
+   } catch (Exception mailboxEx) {
+   // We cancel the source function if some runtime 
exception escaped the mailbox.
+   if (!isCanceled()) {
+   cancelTask();
+   }
+   throw mailboxEx;
+   }
+
+   sourceThread.join();
+   sourceThread.checkThrowSourceExecutionException();
+
context.allActionsCompleted();
}
 
+   private void runAlternativeMailboxLoop() throws InterruptedException {
+
+   while (true) {
+
+   Runnable letter = mailbox.takeMail();
+   if (letter == SOURCE_POISON_LETTER) {
+   break;
+   }
+
+   synchronized (getCheckpointLock()) {
+   letter.run();
+   }
+   }
+   }
+
@Override
-   protected void cancelTask() throws Exception {
+   protected void cancelTask() {
if (headOperator != null) {
headOperator.cancel();
}
@@ -133,4 +166,33 @@ public class SourceStreamTask, OP extends S
}
}
}
+
+   /**
+* Runnable that executes the the source function in the head operator.
+*/
+   private class LegacySourceFunctionThread extends Thread {
+
+   private Throwable sourceExecutionThrowable;
+
+   LegacySourceFunctionThread() {
+   this.sourceExecutionThrowable = null;
+   }
+
+   @Override
+   public void run() {
+   try {
+   headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+   } catch (Throwable t) {
+   sourceExecutionThrowable = t;
+   } finally {
+   mailbox.clearAndPut(SOURCE_POISON_LETTER);
+   }
+   }
+
+   void checkThrowSourceExecutionException() throws Exception {
+   if (sourceExecutionThrowable != null) {
+   throw new Exception(sourceExecutionThrowable);
+   }
+   }
+   }
 }



[flink] branch master updated (3813bb9 -> ead9139)

2019-05-22 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3813bb9  [FLINK-12241][hive] Support Flink functions in catalog 
function APIs of HiveCatalog
 new 022f6cc  [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to 
StreamTask main-loop.
 new ead9139  [FLINK-12483][runtime] Support (legacy) SourceFunction as 
special case in the mailbox model for stream tasks.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/tasks/OneInputStreamTask.java  |  12 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  71 ++-
 .../runtime/tasks/StreamIterationHead.java | 105 -
 .../flink/streaming/runtime/tasks/StreamTask.java  |  82 ++-
 .../runtime/tasks/TwoInputStreamTask.java  |  13 +-
 .../streaming/runtime/tasks/mailbox/Mailbox.java   |  23 +-
 .../runtime/tasks/mailbox/MailboxImpl.java | 236 +
 .../runtime/tasks/mailbox/MailboxReceiver.java |  59 ++
 .../runtime/tasks/mailbox/MailboxSender.java   |  52 +
 ...heckpointExceptionHandlerConfigurationTest.java |   4 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   4 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|  21 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   3 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   3 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |   4 +-
 .../runtime/tasks/mailbox/MailboxImplTest.java | 170 +++
 .../flink/streaming/util/MockStreamTask.java   |   4 +-
 .../jobmaster/JobMasterStopWithSavepointIT.java|  10 +-
 19 files changed, 762 insertions(+), 117 deletions(-)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java
 => 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java
 (60%)
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java
 create mode 100644 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java