[flink] branch master updated: [FLINK-12411][table-planner][tests] Workaround limited support of not nullable fields in window aggregation
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7a3f081 [FLINK-12411][table-planner][tests] Workaround limited support of not nullable fields in window aggregation 7a3f081 is described below commit 7a3f0813d15982177318c6c58cbe1d83799132e4 Author: Dawid Wysakowicz AuthorDate: Mon May 6 09:06:35 2019 +0200 [FLINK-12411][table-planner][tests] Workaround limited support of not nullable fields in window aggregation This does not fix the problem. It just makes the e2e test pass. For a proper fix of the underlying problem see FLINK-12249. --- .../main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java index 47bca8e..cde040d 100644 --- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java @@ -106,7 +106,9 @@ public class StreamSQLTestProgram { String tumbleQuery = String.format( "SELECT " + " key, " + - " CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 ELSE 99 END AS correct, " + + //TODO: The "WHEN -1 THEN NULL" part is a temporary workaround, to make the test pass, for + // https://issues.apache.org/jira/browse/FLINK-12249. We should remove it once the issue is fixed. + " CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 WHEN -1 THEN NULL ELSE 99 END AS correct, " + " TUMBLE_START(rowtime, INTERVAL '%d' SECOND) AS wStart, " + " TUMBLE_ROWTIME(rowtime, INTERVAL '%d' SECOND) AS rowtime " + "FROM (%s) " +
[flink] branch master updated: [FLINK-12563][table-runtime-blink] Introduce vector data format in blink
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1e6ce1b [FLINK-12563][table-runtime-blink] Introduce vector data format in blink 1e6ce1b is described below commit 1e6ce1bb0c38be3ccf6baa5f3c7acedb86528080 Author: Jingsong Lee AuthorDate: Thu May 23 14:45:24 2019 +0800 [FLINK-12563][table-runtime-blink] Introduce vector data format in blink This closes #8492 --- .../apache/flink/table/dataformat/ColumnarRow.java | 210 + .../dataformat/vector/AbstractColumnVector.java| 62 ++ .../dataformat/vector/BooleanColumnVector.java | 26 +++ .../table/dataformat/vector/ByteColumnVector.java | 26 +++ .../table/dataformat/vector/BytesColumnVector.java | 48 + .../table/dataformat/vector/ColumnVector.java | 32 .../flink/table/dataformat/vector/Dictionary.java | 34 .../dataformat/vector/DoubleColumnVector.java | 26 +++ .../table/dataformat/vector/FloatColumnVector.java | 26 +++ .../table/dataformat/vector/IntColumnVector.java | 26 +++ .../table/dataformat/vector/LongColumnVector.java | 26 +++ .../table/dataformat/vector/ShortColumnVector.java | 26 +++ .../dataformat/vector/VectorizedColumnBatch.java | 134 + .../dataformat/vector/heap/AbstractHeapVector.java | 132 + .../dataformat/vector/heap/HeapBooleanVector.java | 51 + .../dataformat/vector/heap/HeapByteVector.java | 50 + .../dataformat/vector/heap/HeapBytesVector.java| 137 ++ .../dataformat/vector/heap/HeapDoubleVector.java | 52 + .../dataformat/vector/heap/HeapFloatVector.java| 51 + .../dataformat/vector/heap/HeapIntVector.java | 50 + .../dataformat/vector/heap/HeapLongVector.java | 50 + .../dataformat/vector/heap/HeapShortVector.java| 50 + .../vector/VectorizedColumnBatchTest.java | 190 +++ 23 files changed, 1515 insertions(+) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java new file mode 100644 index 000..8764c98 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes; +import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; + +/** + * Columnar row to support access to vector column data. It is a row view in {@link VectorizedColumnBatch}. + */ +public final class ColumnarRow implements BaseRow { + private byte header; + private VectorizedColumnBatch vectorizedColumnBatch; + private int rowId; + + public ColumnarRow() {} + + public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) { + this(vectorizedColumnBatch, 0); + } + + public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int rowId) { + this.vectorizedColumnBatch = vectorizedColumnBatch; + this.rowId = rowId; + } + + public void setVectorizedColumnBatch( + VectorizedColumnBatch vectorizedColumnBatch) { + this.vectorizedColumnBatch = vectorizedColumnBatch; + this.rowId = 0; + } + + public void setRowId(int rowId) { + this.rowId = rowId; + } + + @Override + public byte getHeader() { + return header; + } + + @Override + public void setHeader(byte header) { + this.header = header; + } + + @Override + public int getArity() { + return vectorizedColumnBatch.getArity(); + } + + @Override + public boolean isNullAt(int ordinal) { + return vectorizedColumnBatch.isNullAt(rowId, ordi
[flink] branch master updated: [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new aca9c01 [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class aca9c01 is described below commit aca9c018a4b26e50333c33426e77323cbb9b7960 Author: bowen.li AuthorDate: Wed May 22 14:34:42 2019 -0700 [FLINK-12582][table][hive] Alteration APIs in catalogs should check existing object and new object are of the same class This PR supports alterations in catalogs to check existing object and new object are of the same class. Most of them currently don't, e.g. you can alter an existing generic table with a new hive table in GenericInMemoryCatalog. This closes #8514. --- .../flink/table/catalog/hive/HiveCatalog.java | 55 .../hive/HiveCatalogGenericMetadataTest.java | 16 --- .../catalog/hive/HiveCatalogHiveMetadataTest.java | 3 + .../table/catalog/GenericInMemoryCatalog.java | 41 +- .../table/catalog/GenericInMemoryCatalogTest.java | 40 +++--- .../flink/table/catalog/CatalogTestBase.java | 155 ++--- 6 files changed, 218 insertions(+), 92 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index d6818f7..22fe08b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -27,8 +27,6 @@ import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.GenericCatalogDatabase; import org.apache.flink.table.catalog.GenericCatalogFunction; import org.apache.flink.table.catalog.GenericCatalogTable; @@ -230,14 +228,27 @@ public class HiveCatalog implements Catalog { checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty"); checkNotNull(newDatabase, "newDatabase cannot be null"); + CatalogDatabase existingDatabase; + try { + existingDatabase = getDatabase(databaseName); + } catch (DatabaseNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + return; + } + + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format("Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), newDatabase.getClass().getName()) + ); + } + Database newHiveDatabase = instantiateHiveDatabase(databaseName, newDatabase); try { - if (databaseExists(databaseName)) { - client.alterDatabase(databaseName, newHiveDatabase); - } else if (!ignoreIfNotExists) { - throw new DatabaseNotExistException(catalogName, databaseName); - } + client.alterDatabase(databaseName, newHiveDatabase); } catch (TException e) { throw new CatalogException(String.format("Failed to alter database %s", databaseName), e); } @@ -360,32 +371,22 @@ public class HiveCatalog implements Catalog { checkNotNull(tablePath, "tablePath cannot be null"); checkNotNull(newCatalogTable, "newCatalogTable cannot be null"); - if (!tableExists(tablePath)) { + Table hiveTable; + try { + hiveTable = getHiveTable(tablePath); + } catch (TableNotExistException e) { if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); + throw e; } return; } - Table oldTable = getHiveTable(tablePath); - TableType oldTableType = TableType.valueOf(oldTable.getTableType()); +
[flink] branch master updated: [FLINK-12335][table-runtime-blink] Improvement the code and performance of class SegmentsUtil
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 11a96fd [FLINK-12335][table-runtime-blink] Improvement the code and performance of class SegmentsUtil 11a96fd is described below commit 11a96fdf213467595dad73cffd9b05134a4d0d75 Author: liyafan82 AuthorDate: Fri Apr 26 17:43:26 2019 +0800 [FLINK-12335][table-runtime-blink] Improvement the code and performance of class SegmentsUtil This closes #8278 --- .../org/apache/flink/table/util/SegmentsUtil.java | 27 ++ 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java index daf7ebd..47475a4 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java @@ -36,7 +36,7 @@ public class SegmentsUtil { */ public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; - private static final int BIT_BYTE_POSITION_MASK = 0xfff8; + private static final int ADDRESS_BITS_PER_WORD = 3; private static final int BIT_BYTE_INDEX_MASK = 7; @@ -410,6 +410,15 @@ public class SegmentsUtil { } /** +* Given a bit index, return the byte index containing it. +* @param bitIndex the bit index. +* @return the byte index. +*/ + private static int byteIndex(int bitIndex) { + return bitIndex >>> ADDRESS_BITS_PER_WORD; + } + + /** * unset bit. * * @param segment target segment. @@ -417,7 +426,7 @@ public class SegmentsUtil { * @param index bit index from base offset. */ public static void bitUnSet(MemorySegment segment, int baseOffset, int index) { - int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 3); + int offset = baseOffset + byteIndex(index); byte current = segment.get(offset); current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); segment.put(offset, current); @@ -431,7 +440,7 @@ public class SegmentsUtil { * @param index bit index from base offset. */ public static void bitSet(MemorySegment segment, int baseOffset, int index) { - int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 3); + int offset = baseOffset + byteIndex(index); byte current = segment.get(offset); current |= (1 << (index & BIT_BYTE_INDEX_MASK)); segment.put(offset, current); @@ -445,7 +454,7 @@ public class SegmentsUtil { * @param index bit index from base offset. */ public static boolean bitGet(MemorySegment segment, int baseOffset, int index) { - int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 3); + int offset = baseOffset + byteIndex(index); byte current = segment.get(offset); return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; } @@ -460,7 +469,7 @@ public class SegmentsUtil { public static void bitUnSet(MemorySegment[] segments, int baseOffset, int index) { if (segments.length == 1) { MemorySegment segment = segments[0]; - int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 3); + int offset = baseOffset + byteIndex(index); byte current = segment.get(offset); current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); segment.put(offset, current); @@ -470,7 +479,7 @@ public class SegmentsUtil { } private static void bitUnSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) { - int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 3); + int offset = baseOffset + byteIndex(index); int segSize = segments[0].size(); int segIndex = offset / segSize; int segOffset = offset - segIndex * segSize; // equal to % @@ -490,7 +499,7 @@ public class SegmentsUtil { */ public static void bitSet(MemorySegment[] segments, int baseOffset, int index) { if (segments.length == 1) { - int offset = baseOffset + ((index & BIT_BYTE_POSITION_MASK) >>> 3); + int offset = baseOffset + byteIndex(index); MemorySegment segment = segments[0
[flink] branch release-1.8 updated: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 7b862dc [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) 7b862dc is described below commit 7b862dcd91a2506f99dd6e1feacab57bc78fa4e6 Author: Kailash Dayanand AuthorDate: Wed May 22 10:40:27 2019 -0700 [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) --- .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index dc0b1c6..f5b1bf9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -127,7 +127,7 @@ public class StreamingFileSink /** * Creates a new {@code StreamingFileSink} that writes files to the given base directory. */ - private StreamingFileSink( + protected StreamingFileSink( final StreamingFileSink.BucketsBuilder bucketsBuilder, final long bucketCheckInterval) { @@ -170,7 +170,7 @@ public class StreamingFileSink /** * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. */ - private abstract static class BucketsBuilder implements Serializable { + protected abstract static class BucketsBuilder implements Serializable { private static final long serialVersionUID = 1L;
[flink] branch master updated (f652bb5 -> 7748729)
This is an automated email from the ASF dual-hosted git repository. thw pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f652bb5 [FLINK-12447][build] Set minimum maven version to 3.1.1 add 7748729 [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) No new revisions were added by this update. Summary of changes: .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[flink] branch cron-master-maven_compat updated: [FLINK-12447][travis] Remove maven 3.0.5 build
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch cron-master-maven_compat in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/cron-master-maven_compat by this push: new 1a5fdba [FLINK-12447][travis] Remove maven 3.0.5 build 1a5fdba is described below commit 1a5fdbae00aafa8b7fbcab6da687f173db60797a Author: Robert Metzger <89049+rmetz...@users.noreply.github.com> AuthorDate: Wed May 22 19:23:45 2019 +0200 [FLINK-12447][travis] Remove maven 3.0.5 build --- .travis.yml | 4 1 file changed, 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5a7a54a..5dcfdb7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,10 +36,6 @@ matrix: - env: - REMOTE="apache" - BRANCH="master" - - MVN_VERSION="3.0.5" -- env: - - REMOTE="apache" - - BRANCH="master" - MVN_VERSION="3.1.1" - env: - REMOTE="apache"
[flink] branch master updated (02a0cf3 -> f652bb5)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 02a0cf3 [FLINK-12566][table] Remove row interval type add f652bb5 [FLINK-12447][build] Set minimum maven version to 3.1.1 No new revisions were added by this update. Summary of changes: README.md| 4 ++-- docs/flinkDev/building.md| 2 +- docs/flinkDev/building.zh.md | 2 +- pom.xml | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-)
[flink] 03/03: [hotfix][hive] unify instantiateHiveDatabase() in HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 702c489acd3adb4bd354aa1e2384e5344ad07395 Author: bowen.li AuthorDate: Wed May 22 11:46:06 2019 -0700 [hotfix][hive] unify instantiateHiveDatabase() in HiveCatalog --- .../flink/table/catalog/hive/HiveCatalog.java | 51 +- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 07a7159..d6818f7 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -189,14 +189,7 @@ public class HiveCatalog implements Catalog { checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty"); checkNotNull(database, "database cannot be null"); - Database hiveDatabase; - if (database instanceof HiveCatalogDatabase) { - hiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) database); - } else if (database instanceof GenericCatalogDatabase) { - hiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) database); - } else { - throw new CatalogException(String.format("Unsupported catalog database type %s", database.getClass()), null); - } + Database hiveDatabase = instantiateHiveDatabase(databaseName, database); try { client.createDatabase(hiveDatabase); @@ -209,21 +202,26 @@ public class HiveCatalog implements Catalog { } } - private static Database instantiateHiveDatabase(String databaseName, HiveCatalogDatabase database) { - return new Database(databaseName, - database.getComment(), - database.getLocation(), - database.getProperties()); - } - - private static Database instantiateHiveDatabase(String databaseName, GenericCatalogDatabase database) { - Map properties = database.getProperties(); + private static Database instantiateHiveDatabase(String databaseName, CatalogDatabase database) { + if (database instanceof HiveCatalogDatabase) { + HiveCatalogDatabase db = (HiveCatalogDatabase) database; + return new Database( + databaseName, + db.getComment(), + db.getLocation(), + db.getProperties()); + } else if (database instanceof GenericCatalogDatabase) { + GenericCatalogDatabase db = (GenericCatalogDatabase) database; - return new Database(databaseName, - database.getComment(), - // HDFS location URI which GenericCatalogDatabase shouldn't care - null, - maskFlinkProperties(properties)); + return new Database( + databaseName, + db.getComment(), + // HDFS location URI which GenericCatalogDatabase shouldn't care + null, + maskFlinkProperties(db.getProperties())); + } else { + throw new CatalogException(String.format("Unsupported catalog database type %s", database.getClass()), null); + } } @Override @@ -232,14 +230,7 @@ public class HiveCatalog implements Catalog { checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName cannot be null or empty"); checkNotNull(newDatabase, "newDatabase cannot be null"); - Database newHiveDatabase; - if (newDatabase instanceof HiveCatalogDatabase) { - newHiveDatabase = instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) newDatabase); - } else if (newDatabase instanceof GenericCatalogDatabase) { - newHiveDatabase = instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) newDatabase); - } else { - throw new CatalogException(String.format("Unsupported catalog database type %s", newDatabase.getClass()), null); - } + Database newHiveDatabase = instantiateHiveDatabase(databaseName
[flink] branch master updated (02a0cf3 -> 702c489)
This is an automated email from the ASF dual-hosted git repository. bli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 02a0cf3 [FLINK-12566][table] Remove row interval type new f652bb5 [FLINK-12447][build] Set minimum maven version to 3.1.1 new 7748729 [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) new 702c489 [hotfix][hive] unify instantiateHiveDatabase() in HiveCatalog The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: README.md | 4 +- docs/flinkDev/building.md | 2 +- docs/flinkDev/building.zh.md | 2 +- .../flink/table/catalog/hive/HiveCatalog.java | 51 +- .../sink/filesystem/StreamingFileSink.java | 4 +- pom.xml| 4 +- 6 files changed, 29 insertions(+), 38 deletions(-)
[flink] 01/03: [FLINK-12447][build] Set minimum maven version to 3.1.1
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f652bb5a6ff3855ccf0038a5c0469896f41df908 Author: Robert Metzger <89049+rmetz...@users.noreply.github.com> AuthorDate: Wed May 22 19:24:53 2019 +0200 [FLINK-12447][build] Set minimum maven version to 3.1.1 --- README.md| 4 ++-- docs/flinkDev/building.md| 2 +- docs/flinkDev/building.zh.md | 2 +- pom.xml | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 34bf2c4..034189e 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ Prerequisites for building Flink: * Unix-like environment (we use Linux, Mac OS X, Cygwin) * git -* Maven (we recommend version 3.2.5) +* Maven (we recommend version 3.2.5 and require at least 3.1.1) * Java 8 (Java 9 and 10 are not yet supported) ``` @@ -80,7 +80,7 @@ mvn clean package -DskipTests # this will take up to 10 minutes Flink is now installed in `build-target` -*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.0.3 creates the libraries properly. +*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.1.1 creates the libraries properly. To build unit tests with Java 8, use Java 8u51 or above to prevent failures in unit tests that use the PowerMock runner.* ## Developing Flink diff --git a/docs/flinkDev/building.md b/docs/flinkDev/building.md index c3a50d1..521bc9f 100644 --- a/docs/flinkDev/building.md +++ b/docs/flinkDev/building.md @@ -64,7 +64,7 @@ Flink [shades away](https://maven.apache.org/plugins/maven-shade-plugin/) some o The dependency shading mechanism was recently changed in Maven and requires users to build Flink slightly differently, depending on their Maven version: -**Maven 3.0.x, 3.1.x, and 3.2.x** +**Maven 3.1.x and 3.2.x** It is sufficient to call `mvn clean install -DskipTests` in the root directory of Flink code base. **Maven 3.3.x** diff --git a/docs/flinkDev/building.zh.md b/docs/flinkDev/building.zh.md index 75eddd5..c48de50 100644 --- a/docs/flinkDev/building.zh.md +++ b/docs/flinkDev/building.zh.md @@ -64,7 +64,7 @@ Flink [shades away](https://maven.apache.org/plugins/maven-shade-plugin/) some o The dependency shading mechanism was recently changed in Maven and requires users to build Flink slightly differently, depending on their Maven version: -**Maven 3.0.x, 3.1.x, and 3.2.x** +**Maven 3.1.x and 3.2.x** It is sufficient to call `mvn clean install -DskipTests` in the root directory of Flink code base. **Maven 3.3.x** diff --git a/pom.xml b/pom.xml index c7b6ac3..086e452 100644 --- a/pom.xml +++ b/pom.xml @@ -1502,8 +1502,8 @@ under the License. - - [3.0.3,) + + [3.1.1,) ${java.version}
[flink] 02/03: [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469)
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 774872927d12ed6662addb8b601a9140226c8625 Author: Kailash Dayanand AuthorDate: Wed May 22 10:40:27 2019 -0700 [FLINK-12539] [fs-connector] Make StreamingFileSink customizable (#8469) --- .../streaming/api/functions/sink/filesystem/StreamingFileSink.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index dc0b1c6..f5b1bf9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -127,7 +127,7 @@ public class StreamingFileSink /** * Creates a new {@code StreamingFileSink} that writes files to the given base directory. */ - private StreamingFileSink( + protected StreamingFileSink( final StreamingFileSink.BucketsBuilder bucketsBuilder, final long bucketCheckInterval) { @@ -170,7 +170,7 @@ public class StreamingFileSink /** * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. */ - private abstract static class BucketsBuilder implements Serializable { + protected abstract static class BucketsBuilder implements Serializable { private static final long serialVersionUID = 1L;
[flink-web] branch asf-site updated (2419879 -> a47c0cf)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 2419879 Rebuild website new c28aae2 [Blog] State TTL in Flink 1.8.0. new a47c0cf rebuild site The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: _posts/2019-05-17-state-ttl.md| 135 +++ content/2019/05/19/state-ttl.html | 335 ++ content/blog/index.html | 36 ++-- content/blog/page2/index.html | 38 +++-- content/blog/page3/index.html | 42 +++-- content/blog/page4/index.html | 42 +++-- content/blog/page5/index.html | 40 +++-- content/blog/page6/index.html | 40 +++-- content/blog/page7/index.html | 40 +++-- content/blog/page8/index.html | 40 +++-- content/blog/page9/index.html | 25 +++ content/index.html| 6 +- content/zh/index.html | 6 +- 13 files changed, 699 insertions(+), 126 deletions(-) create mode 100644 _posts/2019-05-17-state-ttl.md create mode 100644 content/2019/05/19/state-ttl.html
[flink-web] 01/02: [Blog] State TTL in Flink 1.8.0.
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit c28aae2eb9d2976828596d2182ed728e48bdd4dc Author: Marta Paes Moreira AuthorDate: Mon May 20 13:23:12 2019 +0200 [Blog] State TTL in Flink 1.8.0. Submitting the markup file for a blog post by Andrey and Fabian about state TTL. This closes #216 --- _posts/2019-05-17-state-ttl.md | 135 + 1 file changed, 135 insertions(+) diff --git a/_posts/2019-05-17-state-ttl.md b/_posts/2019-05-17-state-ttl.md new file mode 100644 index 000..592f180 --- /dev/null +++ b/_posts/2019-05-17-state-ttl.md @@ -0,0 +1,135 @@ +--- +layout: post +title: "State TTL in Flink 1.8.0: How to Automatically Cleanup Application State in Apache Flink" +date: 2019-05-19T12:00:00.000Z +authors: +- fabian: + name: "Fabian Hueske" + twitter: "fhueske" +- andrey: + name: "Andrey Zagrebin" + + +excerpt: A common requirement for many stateful streaming applications is to automatically cleanup application state for effective management of your state size, or to control how long the application state can be accessed. State TTL enables application state cleanup and efficient state size management in Apache Flink +--- + +A common requirement for many stateful streaming applications is to automatically cleanup application state for effective management of your state size, or to control how long the application state can be accessed (e.g. due to legal regulations like the GDPR). The state time-to-live (TTL) feature was initiated in Flink 1.6.0 and enabled application state cleanup and efficient state size management in Apache Flink. + +In this post, we motivate the State TTL feature and discuss its use cases. Moreover, we show how to use and configure it. We explain how Flink internally manages state with TTL and present some exciting additions to the feature in Flink 1.8.0. The blog post concludes with an outlook on future improvements and extensions. + +# The Transient Nature of State + +There are two major reasons why state should be maintained only for a limited time. For example, let’s imagine a Flink application that ingests a stream of user login events and stores for each user the time of the last login to improve the experience of frequent visitors. + +* **Controlling the size of state.** +Being able to efficiently manage an ever-growing state size is a primary use case for state TTL. Oftentimes, data needs to be persisted temporarily while there is some user activity around it, e.g. web sessions. When the activity ends there is no longer interest in that data while it still occupies storage. Flink 1.8.0 introduces background cleanup of old state based on TTL that makes the eviction of no-longer-necessary data frictionless. Previously, the application developer had to take [...] + +* **Complying with data protection and sensitive data requirements.** +Recent developments around data privacy regulations, such as the General Data Protection Regulation (GDPR) introduced by the European Union, make compliance with such data requirements or treating sensitive data a top priority for many use cases and applications. An example of such use cases includes applications that require keeping data for a specific timeframe and preventing access to it thereafter. This is a common challenge for companies providing short-term services to their custom [...] + +Both requirements can be addressed by a feature that periodically, yet continuously, removes the state for a key once it becomes unnecessary or unimportant and there is no requirement to keep it in storage any more. + +# State TTL for continuous cleanup of application state + +The 1.6.0 release of Apache Flink introduced the State TTL feature. It enabled developers of stream processing applications to configure the state of operators to expire and be cleaned up after a defined timeout (time-to-live). In Flink 1.8.0 the feature was extended, including continuous cleanup of old entries for both the RocksDB and the heap state backends (FSStateBackend and MemoryStateBackend), enabling a continuous cleanup process of old entries (according to the TTL setting). + +In Flink’s DataStream API, application state is defined by a [state descriptor](https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#using-managed-keyed-state). State TTL is configured by passing a `StateTtlConfiguration` object to a state descriptor. The following Java example shows how to create a state TTL configuration and provide it to the state descriptor that holds the last login time of a user as a `Long` value: + +```java +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.state.ValueStateDescriptor; + +StateTtlConfig ttlCon
[flink-web] 02/02: rebuild site
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit a47c0cff9575dd447a1c4d275de3428ecca49269 Author: Robert Metzger AuthorDate: Wed May 22 15:24:00 2019 +0200 rebuild site --- content/2019/05/19/state-ttl.html | 335 ++ content/blog/index.html | 36 ++-- content/blog/page2/index.html | 38 +++-- content/blog/page3/index.html | 42 +++-- content/blog/page4/index.html | 42 +++-- content/blog/page5/index.html | 40 +++-- content/blog/page6/index.html | 40 +++-- content/blog/page7/index.html | 40 +++-- content/blog/page8/index.html | 40 +++-- content/blog/page9/index.html | 25 +++ content/index.html| 6 +- content/zh/index.html | 6 +- 12 files changed, 564 insertions(+), 126 deletions(-) diff --git a/content/2019/05/19/state-ttl.html b/content/2019/05/19/state-ttl.html new file mode 100644 index 000..3c0661a --- /dev/null +++ b/content/2019/05/19/state-ttl.html @@ -0,0 +1,335 @@ + + + + + + + +Apache Flink: State TTL in Flink 1.8.0: How to Automatically Cleanup Application State in Apache Flink + + + + +https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css";> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +What is Apache Flink? + + +Use Cases + + +Powered By + + +FAQ + + + + + +Downloads + + + + https://ci.apache.org/projects/flink/flink-docs-release-1.8/quickstart/setup_quickstart.html"; target="_blank">Tutorials + + + + + Documentation + +https://ci.apache.org/projects/flink/flink-docs-release-1.8"; target="_blank">1.8 (Latest stable release) +https://ci.apache.org/projects/flink/flink-docs-master"; target="_blank">1.9 (Snapshot) + + + + +Getting Help + + +Flink Blog + + + + + + +Community & Project Info + + +Roadmap + + +How to Contribute + + + + https://github.com/apache/flink"; target="_blank">Flink on GitHub + + + + + + + + + 中文版 + + + + + + + + + + +https://twitter.com/apacheflink"; target="_blank">@ApacheFlink + + +Plan Visualizer + + + + + + + + + + + State TTL in Flink 1.8.0: How to Automatically Cleanup Application State in Apache Flink + + +19 May 2019 Fabian Hueske (https://twitter.com/fhueske";>@fhueske) & Andrey Zagrebin + +A common requirement for many stateful streaming applications is to automatically cleanup application state for effective management of your state size, or to control how long the application state can be accessed (e.g. due to legal regulations like the GDPR). The state time-to-live (TTL) feature was initiated in Flink 1.6.0 and enabled application state cleanup and efficient state size management in Apache Flink. + +In this post, we motivate the State TTL feature and discuss its use cases. Moreover, we show how to use and configure it. We explain how Flink internally manages state with TTL and present some exciting additions to the feature in Flink 1.8.0. The blog post concludes with an outlook on future improvements and extensions. + +The Transient Nature of State + +There are two major reasons why state should be maintained only for a limited time. For example, let’s imagine a Flink application that ingests a stream of user login events and stores for each user the time of the last login to improve the experience of frequent visitors. + + + +Controlling the size of state. +Being able to efficiently manage an ever-growing state size is a primary use case for state TTL. Oftentimes, data needs to be persisted temporarily while there is some user activity around it, e.g. web sessions. When the activity ends there is no longer interest in that data while it still occupies storage. Flink 1.8.0 introduces background cleanup of old state
[flink] branch master updated: [FLINK-12566][table] Remove row interval type
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 02a0cf3 [FLINK-12566][table] Remove row interval type 02a0cf3 is described below commit 02a0cf3d4e26f0217b87b7aaef604a6f39a56977 Author: Timo Walther AuthorDate: Tue May 21 11:43:28 2019 +0200 [FLINK-12566][table] Remove row interval type This closes #8497. --- .../table/expressions/ApiExpressionUtils.java | 5 ++- .../table/expressions/ValueLiteralExpression.java | 3 -- .../flink/table/typeutils/RowIntervalTypeInfo.java | 41 -- .../codegen/agg/batch/WindowCodeGenerator.scala| 20 +-- .../flink/table/plan/util/AggregateUtil.scala | 33 + .../expressions/rules/OverWindowResolverRule.java | 4 +-- .../operations/AggregateOperationFactory.java | 12 +++ .../table/expressions/PlannerExpressionUtils.scala | 7 ++-- .../org/apache/flink/table/expressions/call.scala | 14 .../apache/flink/table/expressions/literals.scala | 3 +- .../flink/table/expressions/overOffsets.scala | 7 ++-- .../table/runtime/aggregate/AggregateUtil.scala| 7 ++-- .../table/api/batch/table/GroupWindowTest.scala| 8 ++--- .../table/GroupWindowTableAggregateTest.scala | 8 ++--- .../table/api/stream/table/GroupWindowTest.scala | 10 +++--- 15 files changed, 67 insertions(+), 115 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java index 2bbfaa2..1ef8db3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.typeutils.RowIntervalTypeInfo; import org.apache.flink.table.typeutils.TimeIntervalTypeInfo; import java.util.Arrays; @@ -124,10 +123,10 @@ public final class ApiExpressionUtils { public static Expression toRowInterval(Expression e) { final Optional intInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO) - .map((v) -> valueLiteral((long) v, RowIntervalTypeInfo.INTERVAL_ROWS)); + .map((v) -> valueLiteral((long) v, BasicTypeInfo.LONG_TYPE_INFO)); final Optional longInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.LONG_TYPE_INFO) - .map((v) -> valueLiteral(v, RowIntervalTypeInfo.INTERVAL_ROWS)); + .map((v) -> valueLiteral(v, BasicTypeInfo.LONG_TYPE_INFO)); if (intInterval.isPresent()) { return intInterval.get(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java index f8138f6..186c90c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.table.typeutils.RowIntervalTypeInfo; import org.apache.flink.table.typeutils.TimeIntervalTypeInfo; import org.apache.flink.util.Preconditions; @@ -109,8 +108,6 @@ public final class ValueLiteralExpression implements Expression { return value + ".millis"; } else if (type.equals(TimeIntervalTypeInfo.INTERVAL_MONTHS)) { return value + ".months"; - } else if (type.equals(RowIntervalTypeInfo.INTERVAL_ROWS)) { - return value + ".rows"; } else { return stringifyValue(value); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java deleted file mode 100644 index a7cd362..000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/
[flink] branch release-1.7 updated: [FLINK-12578][build] Use https URL for Maven repositories
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.7 by this push: new 69b9a39 [FLINK-12578][build] Use https URL for Maven repositories 69b9a39 is described below commit 69b9a3908cca80dc41db1d15b2968670887cff6e Author: Jungtaek Lim AuthorDate: Wed May 22 19:16:02 2019 +0900 [FLINK-12578][build] Use https URL for Maven repositories MapR repositories are excluded from this change as there appears to be some certificate issue on the MapR side. The latest documentation (https://mapr.com/docs/61/DevelopmentGuide/MavenArtifacts.html) also recommends using the http url. --- flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml | 2 +- flink-formats/flink-avro-confluent-registry/pom.xml| 2 +- pom.xml| 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml index ea81997..832b243 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml @@ -35,7 +35,7 @@ under the License. confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml index 9b93e35..7307772 100644 --- a/flink-formats/flink-avro-confluent-registry/pom.xml +++ b/flink-formats/flink-avro-confluent-registry/pom.xml @@ -32,7 +32,7 @@ under the License. confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ diff --git a/pom.xml b/pom.xml index 5fd305a..d93cdd0 100644 --- a/pom.xml +++ b/pom.xml @@ -946,14 +946,14 @@ under the License. HDPReleases HDP Releases - http://repo.hortonworks.com/content/repositories/releases/ + https://repo.hortonworks.com/content/repositories/releases/ false true HortonworksJettyHadoop HDP Jetty - http://repo.hortonworks.com/content/repositories/jetty-hadoop + https://repo.hortonworks.com/content/repositories/jetty-hadoop false true
[flink] branch release-1.8 updated: [FLINK-12578][build] Use https URL for Maven repositories
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 923ffec [FLINK-12578][build] Use https URL for Maven repositories 923ffec is described below commit 923ffec16b1c57b1d3d705ec7f32db5cea951225 Author: Jungtaek Lim AuthorDate: Wed May 22 19:16:02 2019 +0900 [FLINK-12578][build] Use https URL for Maven repositories MapR repositories are excluded from this change as there appears to be some certificate issue on the MapR side. The latest documentation (https://mapr.com/docs/61/DevelopmentGuide/MavenArtifacts.html) also recommends using the http url. --- flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml | 2 +- flink-formats/flink-avro-confluent-registry/pom.xml| 2 +- pom.xml| 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml index 0dc5869..7b61824 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml @@ -35,7 +35,7 @@ under the License. confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml index 4c99f71..10dad8d 100644 --- a/flink-formats/flink-avro-confluent-registry/pom.xml +++ b/flink-formats/flink-avro-confluent-registry/pom.xml @@ -32,7 +32,7 @@ under the License. confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ diff --git a/pom.xml b/pom.xml index 6a9f52c..21f4265 100644 --- a/pom.xml +++ b/pom.xml @@ -974,14 +974,14 @@ under the License. HDPReleases HDP Releases - http://repo.hortonworks.com/content/repositories/releases/ + https://repo.hortonworks.com/content/repositories/releases/ false true HortonworksJettyHadoop HDP Jetty - http://repo.hortonworks.com/content/repositories/jetty-hadoop + https://repo.hortonworks.com/content/repositories/jetty-hadoop false true
[flink] 03/10: [hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e6b8423ee34c2c49f0ea6eb0955ced5db57dd18f Author: Andrey Zagrebin AuthorDate: Sat May 11 19:49:30 2019 +0200 [hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests --- .../partition/consumer/RemoteInputChannel.java | 11 -- .../runtime/io/network/NetworkEnvironmentTest.java | 20 +-- .../netty/PartitionRequestClientHandlerTest.java | 22 +--- .../network/partition/InputChannelTestUtils.java | 47 ++- .../network/partition/InputGateConcurrentTest.java | 12 +- .../network/partition/InputGateFairnessTest.java | 10 +- .../partition/consumer/InputChannelBuilder.java| 145 + .../partition/consumer/LocalInputChannelTest.java | 19 +-- .../partition/consumer/RemoteInputChannelTest.java | 68 +++--- .../partition/consumer/SingleInputGateTest.java| 107 --- 10 files changed, 228 insertions(+), 233 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 87e3f62..30246c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -108,17 +108,6 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, - InputChannelMetrics metrics) { - - this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics); - } - - public RemoteInputChannel( - SingleInputGate inputGate, - int channelIndex, - ResultPartitionID partitionId, - ConnectionID connectionId, - ConnectionManager connectionManager, int initialBackOff, int maxBackoff, InputChannelMetrics metrics) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index a610838..81cd8f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -19,11 +19,10 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.taskmanager.Task; @@ -288,7 +287,7 @@ public class NetworkEnvironmentTest { * @param numberOfChannels * the number of input channels * -* @return input gate with some fake settings +* @return input gate with some fake settiFngs */ private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) { return spy(new SingleInputGateBuilder() @@ -303,15 +302,10 @@ public class NetworkEnvironmentTest { int channelIndex, ResultPartition resultPartition, ConnectionManager connManager) { - RemoteInputChannel channel = new RemoteInputChannel( - inputGate, - channelIndex, - resultPartition.getPartitionId(), - mock(ConnectionID.class), - connManager, - 0, - 0, - InputChannelTestUtils.newUnregisteredInputChannelMetrics()); - inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), channel); + InputChannelBuilder.newBuilder() + .setChannelIndex(channelIndex) + .setPartitionId(resultPartition.getPartitionId
[flink] 02/10: [hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit efa7abf2c927958447d6533a3cbd5ccc105a254f Author: Zhijiang AuthorDate: Fri Apr 26 18:31:33 2019 +0800 [hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests --- .../runtime/io/network/NetworkEnvironmentTest.java | 9 +- .../io/network/partition/PartitionTestUtils.java | 43 +++- .../network/partition/ResultPartitionBuilder.java | 113 + .../partition/consumer/LocalInputChannelTest.java | 32 ++ .../StreamNetworkBenchmarkEnvironment.java | 22 ++-- 5 files changed, 148 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index bcb4d04..a610838 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; -import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; @@ -272,8 +272,11 @@ public class NetworkEnvironmentTest { */ private static ResultPartition createResultPartition( final ResultPartitionType partitionType, final int channels) { - - return PartitionTestUtils.createPartition(partitionType, channels); + return new ResultPartitionBuilder() + .setResultPartitionType(partitionType) + .setNumberOfSubpartitions(channels) + .setNumTargetKeyGroups(channels) + .build(); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index 3391030..b52bffe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager; -import org.apache.flink.runtime.taskmanager.NoOpTaskActions; - /** * This class should consolidate all mocking logic for ResultPartitions. * While using Mockito internally (for now), the use of Mockito should not @@ -34,41 +30,26 @@ public class PartitionTestUtils { } public static ResultPartition createPartition(ResultPartitionType type) { - return createPartition( - new NoOpResultPartitionConsumableNotifier(), - type, - false); - } - - public static ResultPartition createPartition(ResultPartitionType type, int numChannels) { - return createPartition(new NoOpResultPartitionConsumableNotifier(), type, numChannels, false); + return new ResultPartitionBuilder().setResultPartitionType(type).build(); } public static ResultPartition createPartition( ResultPartitionConsumableNotifier notifier, ResultPartitionType type, boolean sendScheduleOrUpdateConsumersMessage) { - - return createPartition(notifier, type, 1, sendScheduleOrUpdateConsumersMessage); + return new ResultPartitionBuilder() + .setResultPartitionConsumableNotifier(notifier) + .setResultPartitionType(type) + .setSendScheduleOrUpdateConsumersMessage(sendScheduleOrUpdateConsumersMessage) + .build(); } public static ResultPartition createPartition( - ResultPartitionConsumableNotifier notifier, - ResultPartitionType type, - int numChannels, - boolean sendScheduleOrUpdateConsumersMessage) { - - return new ResultP
[flink] 10/10: [FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup()
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3d81425de91ecbab715f8a7d169e0a7796d19c51 Author: Andrey Zagrebin AuthorDate: Fri May 10 22:14:07 2019 +0200 [FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup() Move input gate setup from NetworkEnvironment to InputGate. This eliminates tie between Task and NetworkEnvironment. Task does not need depend on NetworkEnvironment and can trigger setup from InputGate directly. --- .../runtime/io/network/NetworkEnvironment.java | 57 -- .../io/network/partition/consumer/InputGate.java | 5 ++ .../partition/consumer/SingleInputGate.java| 19 +++- .../partition/consumer/SingleInputGateFactory.java | 55 ++--- .../network/partition/consumer/UnionInputGate.java | 4 ++ .../org/apache/flink/runtime/taskmanager/Task.java | 28 ++- .../runtime/io/network/NetworkEnvironmentTest.java | 49 +-- .../network/partition/InputGateFairnessTest.java | 8 ++- .../network/partition/ResultPartitionBuilder.java | 2 +- .../partition/consumer/InputGateTestBase.java | 16 -- .../partition/consumer/SingleInputGateBuilder.java | 25 +- .../partition/consumer/SingleInputGateTest.java| 16 +++--- .../apache/flink/runtime/taskmanager/TaskTest.java | 52 ++-- .../runtime/io/BarrierBufferMassiveRandomTest.java | 4 ++ .../flink/streaming/runtime/io/MockInputGate.java | 4 ++ .../StreamNetworkBenchmarkEnvironment.java | 2 +- 16 files changed, 219 insertions(+), 127 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index de669bb..1f2ee7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge; import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge; @@ -47,9 +46,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -190,60 +187,6 @@ public class NetworkEnvironment { return config; } - // - // Task operations - // - - public void registerTask(Task task) throws IOException { - final ResultPartition[] producedPartitions = task.getProducedPartitions(); - - synchronized (lock) { - if (isShutdown) { - throw new IllegalStateException("NetworkEnvironment is shut down"); - } - - for (final ResultPartition partition : producedPartitions) { - partition.setup(); - } - - // Setup the buffer pool for each buffer reader - final SingleInputGate[] inputGates = task.getAllInputGates(); - for (SingleInputGate gate : inputGates) { - setupInputGate(gate); - } - } - } - - @VisibleForTesting - public void setupInputGate(SingleInputGate gate) throws IOException { - BufferPool bufferPool = null; - int maxNumberOfMemorySegments; - try { - if (config.isCreditBased()) { - maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; - -
[flink] 08/10: [hotfix][network] Move MemorySegmentProvider from SingleInputGate to RemoteInputChannel
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 8589c82b4cbc8a73417974107186f9ccc3a629d3 Author: Andrey Zagrebin AuthorDate: Fri May 10 20:25:59 2019 +0200 [hotfix][network] Move MemorySegmentProvider from SingleInputGate to RemoteInputChannel --- .../runtime/io/network/NetworkEnvironment.java | 6 +- .../partition/consumer/RemoteInputChannel.java | 21 +-- .../partition/consumer/SingleInputGate.java| 33 ++ .../partition/consumer/SingleInputGateFactory.java | 14 - .../partition/consumer/UnknownInputChannel.java| 13 +++- .../runtime/io/network/NetworkEnvironmentTest.java | 41 +++-- ...editBasedPartitionRequestClientHandlerTest.java | 21 --- .../netty/PartitionRequestClientHandlerTest.java | 54 ++-- .../network/netty/PartitionRequestClientTest.java | 17 -- .../network/partition/InputChannelTestUtils.java | 71 ++ .../partition/consumer/InputChannelBuilder.java| 14 - .../partition/consumer/RemoteInputChannelTest.java | 63 +-- .../partition/consumer/SingleInputGateTest.java| 3 +- .../StreamNetworkBenchmarkEnvironment.java | 3 +- 14 files changed, 225 insertions(+), 149 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 97016ef..459669c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -138,8 +138,8 @@ public class NetworkEnvironment { ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(resultPartitionManager, ioManager); - SingleInputGateFactory singleInputGateFactory = - new SingleInputGateFactory(config, connectionManager, resultPartitionManager, taskEventPublisher); + SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( + config, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool); return new NetworkEnvironment( config, @@ -247,7 +247,7 @@ public class NetworkEnvironment { config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool); + gate.assignExclusiveSegments(); bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); } else { maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 98182c6..397c4fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentProvider; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -34,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.util.ExceptionUtils; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -103,6 +105,10 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, @GuardedBy("bufferQueue") private boolean isWaitingForFloatingBuffers; + /** Global memory segment provider to request and recycle exclusive buffers (only for credit-based). */ + @Nonnull + private final MemorySegmentProvider memorySegmentProvider; + public RemoteInputChannel( SingleInputGate inputGate, int channelIndex, @@ -111,23 +117,26 @@ public class RemoteInputChannel extends InputChannel implements Buf
[flink] 09/10: [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup()
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e99eee4f0bf9b2d851fa12ea52007e8f083c366f Author: Andrey Zagrebin AuthorDate: Fri May 10 20:56:25 2019 +0200 [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup() Move partition setup from NetworkEnvironment to ResultPartition. This eliminates tie between Task and NetworkEnvironment. Task does not need depend on NetworkEnvironment and can trigger setup from ResultPartitionWriter interface. --- .../runtime/io/network/NetworkEnvironment.java | 47 +--- .../network/api/writer/ResultPartitionWriter.java | 5 ++ .../io/network/partition/ResultPartition.java | 22 +--- .../network/partition/ResultPartitionFactory.java | 49 +++-- .../runtime/io/network/NetworkEnvironmentTest.java | 39 -- .../AbstractCollectingResultPartitionWriter.java | 4 ++ .../io/network/api/writer/RecordWriterTest.java| 8 +++ .../io/network/partition/PartitionTestUtils.java | 4 ++ .../network/partition/ResultPartitionBuilder.java | 63 -- .../io/network/partition/ResultPartitionTest.java | 5 +- .../partition/consumer/LocalInputChannelTest.java | 9 ++-- .../StreamNetworkBenchmarkEnvironment.java | 3 +- 12 files changed, 168 insertions(+), 90 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 459669c..de669bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -57,7 +57,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; -import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -135,11 +134,19 @@ public class NetworkEnvironment { registerNetworkMetrics(metricGroup, networkBufferPool); ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - ResultPartitionFactory resultPartitionFactory = - new ResultPartitionFactory(resultPartitionManager, ioManager); + ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( + resultPartitionManager, + ioManager, + networkBufferPool, + config.networkBuffersPerChannel(), + config.floatingNetworkBuffersPerGate()); SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( - config, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool); + config, + connectionManager, + resultPartitionManager, + taskEventPublisher, + networkBufferPool); return new NetworkEnvironment( config, @@ -196,7 +203,7 @@ public class NetworkEnvironment { } for (final ResultPartition partition : producedPartitions) { - setupPartition(partition); + partition.setup(); } // Setup the buffer pool for each buffer reader @@ -208,36 +215,6 @@ public class NetworkEnvironment { } @VisibleForTesting - public void setupPartition(ResultPartition partition) throws IOException { - BufferPool bufferPool = null; - - try { - int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ? - partition.getNumberOfSubpartitions() * config.networkBuffersPerChannel() + - config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; - // If the partition type is back pressure-free, we register with the buffer pool for - // callbacks to release memory. - bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), - maxNumberOfMemorySegments, - partition.getPartitionType().hasBackPressure() ? Optional.empty() : Optional.of(partition)); - - partition.registerBufferPool(bufferPool); - - resultPartitionManager.registerResultPartition(partition); - } catch (Throwable t) { - if (bufferPool != null) { -
[flink] 04/10: [hotfix][tests][network] Introduce NetworkEnvironment.create factory method
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e677bb6dcc38dfb3b5bf8b20e4a2d324b9e20ab5 Author: Andrey Zagrebin AuthorDate: Sun May 12 12:55:01 2019 +0200 [hotfix][tests][network] Introduce NetworkEnvironment.create factory method --- .../runtime/io/network/NetworkEnvironment.java | 47 +++--- .../runtime/taskexecutor/TaskManagerServices.java | 2 +- .../io/network/NetworkEnvironmentBuilder.java | 2 +- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index b9b35c9..ea482f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -92,31 +92,50 @@ public class NetworkEnvironment { private boolean isShutdown; - public NetworkEnvironment( + private NetworkEnvironment( NetworkEnvironmentConfiguration config, + NetworkBufferPool networkBufferPool, + ConnectionManager connectionManager, + ResultPartitionManager resultPartitionManager, TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup, IOManager ioManager) { - this.config = checkNotNull(config); + this.config = config; + this.networkBufferPool = networkBufferPool; + this.connectionManager = connectionManager; + this.resultPartitionManager = resultPartitionManager; + this.taskEventPublisher = taskEventPublisher; + this.ioManager = ioManager; + this.isShutdown = false; + } - this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + public static NetworkEnvironment create( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + MetricGroup metricGroup, + IOManager ioManager) { + checkNotNull(ioManager); + checkNotNull(taskEventPublisher); + checkNotNull(config); NettyConfig nettyConfig = config.nettyConfig(); - if (nettyConfig != null) { - this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased()); - } else { - this.connectionManager = new LocalConnectionManager(); - } - - this.resultPartitionManager = new ResultPartitionManager(); + ConnectionManager connectionManager = nettyConfig != null ? + new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager(); - this.taskEventPublisher = checkNotNull(taskEventPublisher); + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + config.numNetworkBuffers(), + config.networkBufferSize()); registerNetworkMetrics(metricGroup, networkBufferPool); - this.ioManager = checkNotNull(ioManager); + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - isShutdown = false; + return new NetworkEnvironment( + config, + networkBufferPool, + connectionManager, + resultPartitionManager, + taskEventPublisher, + ioManager); } private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index e2ec942..e19e8fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -246,7 +246,7 @@ public class TaskManagerServices { // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); - final NetworkEnvironment network = new NetworkEnvironment( + final NetworkEnvironment network = NetworkEnvironment.create(
[flink] 07/10: [hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 2375693f1cc80118bf61922f027942a0f101c486 Author: Andrey Zagrebin AuthorDate: Fri May 10 19:01:31 2019 +0200 [hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments --- .../flink/core/memory/MemorySegmentProvider.java | 31 +++ .../runtime/io/network/NetworkEnvironment.java | 5 +- .../io/network/buffer/NetworkBufferPool.java | 35 +++- .../partition/consumer/RemoteInputChannel.java | 5 +- .../partition/consumer/SingleInputGate.java| 29 -- .../runtime/io/network/NetworkEnvironmentTest.java | 16 +++--- .../io/network/buffer/BufferPoolFactoryTest.java | 12 ++-- .../network/buffer/LocalBufferPoolDestroyTest.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 2 +- .../io/network/buffer/NetworkBufferPoolTest.java | 65 ++ ...editBasedPartitionRequestClientHandlerTest.java | 15 ++--- .../netty/PartitionRequestClientHandlerTest.java | 5 +- .../network/netty/PartitionRequestClientTest.java | 12 ++-- .../partition/consumer/LocalInputChannelTest.java | 2 +- .../partition/consumer/RemoteInputChannelTest.java | 40 ++--- .../BackPressureStatsTrackerImplITCase.java| 2 +- .../runtime/io/BarrierBufferMassiveRandomTest.java | 4 +- 17 files changed, 150 insertions(+), 132 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java new file mode 100644 index 000..83b2d19 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentProvider.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.io.IOException; +import java.util.Collection; + +/** + * The provider used for requesting and releasing batch of memory segments. + */ +public interface MemorySegmentProvider { + Collection requestMemorySegments() throws IOException; + + void recycleMemorySegments(Collection segments) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index d0e8263..97016ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -129,7 +129,8 @@ public class NetworkEnvironment { NetworkBufferPool networkBufferPool = new NetworkBufferPool( config.numNetworkBuffers(), - config.networkBufferSize()); + config.networkBufferSize(), + config.networkBuffersPerChannel()); registerNetworkMetrics(metricGroup, networkBufferPool); @@ -246,7 +247,7 @@ public class NetworkEnvironment { config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool, config.networkBuffersPerChannel()); + gate.assignExclusiveSegments(networkBufferPool); bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); } else { maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 48ce27e..3d6c2da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
[flink] branch master updated (e16fa9f -> 3d81425)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e16fa9f [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API new 15811a5 [hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests new efa7abf [hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests new e6b8423 [hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests new e677bb6 [hotfix][tests][network] Introduce NetworkEnvironment.create factory method new e546009 [hotfix][network] Introduce ResultPartitionFactory new c00d879 [hotfix][network] Introduce SingleInputGateFactory new 2375693 [hotfix][network] Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments new 8589c82 [hotfix][network] Move MemorySegmentProvider from SingleInputGate to RemoteInputChannel new e99eee4 [FLINK-12331][network] Refactor NetworkEnvironment#setupPartition() to ResultPartitionWriter#setup() new 3d81425 [FLINK-12331][network] Refactor NetworkEnvironment#setupInputGate() to InputGate#setup() The 10 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...{MemoryType.java => MemorySegmentProvider.java} | 19 +- .../runtime/io/network/NetworkEnvironment.java | 179 ++ .../network/api/writer/ResultPartitionWriter.java | 5 + .../io/network/buffer/NetworkBufferPool.java | 35 ++-- .../io/network/partition/ResultPartition.java | 82 ++-- .../network/partition/ResultPartitionFactory.java | 201 .../io/network/partition/consumer/InputGate.java | 5 + .../partition/consumer/RemoteInputChannel.java | 33 ++-- .../partition/consumer/SingleInputGate.java| 153 +++ .../partition/consumer/SingleInputGateFactory.java | 207 + .../network/partition/consumer/UnionInputGate.java | 4 + .../partition/consumer/UnknownInputChannel.java| 13 +- .../runtime/taskexecutor/TaskManagerServices.java | 2 +- .../org/apache/flink/runtime/taskmanager/Task.java | 28 +-- .../io/network/NetworkEnvironmentBuilder.java | 2 +- .../runtime/io/network/NetworkEnvironmentTest.java | 153 +++ .../AbstractCollectingResultPartitionWriter.java | 4 + .../io/network/api/writer/RecordWriterTest.java| 8 + .../io/network/buffer/BufferPoolFactoryTest.java | 12 +- .../network/buffer/LocalBufferPoolDestroyTest.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 2 +- .../io/network/buffer/NetworkBufferPoolTest.java | 65 +++ ...editBasedPartitionRequestClientHandlerTest.java | 30 +-- .../netty/PartitionRequestClientHandlerTest.java | 67 +-- .../network/netty/PartitionRequestClientTest.java | 25 ++- .../network/partition/InputChannelTestUtils.java | 140 -- .../network/partition/InputGateConcurrentTest.java | 12 +- .../network/partition/InputGateFairnessTest.java | 18 +- .../io/network/partition/PartitionTestUtils.java | 45 ++--- .../network/partition/ResultPartitionBuilder.java | 165 .../io/network/partition/ResultPartitionTest.java | 5 +- .../partition/consumer/InputChannelBuilder.java| 155 +++ .../partition/consumer/InputGateTestBase.java | 20 +- .../partition/consumer/LocalInputChannelTest.java | 78 +++- .../partition/consumer/RemoteInputChannelTest.java | 151 +++ .../partition/consumer/SingleInputGateBuilder.java | 105 +++ .../partition/consumer/SingleInputGateTest.java| 143 +- .../BackPressureStatsTrackerImplITCase.java| 2 +- .../apache/flink/runtime/taskmanager/TaskTest.java | 52 +- .../runtime/io/BarrierBufferMassiveRandomTest.java | 8 +- .../flink/streaming/runtime/io/MockInputGate.java | 4 + .../StreamNetworkBenchmarkEnvironment.java | 50 ++--- 42 files changed, 1517 insertions(+), 972 deletions(-) copy flink-core/src/main/java/org/apache/flink/core/memory/{MemoryType.java => MemorySegmentProvider.java} (72%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.ja
[flink] 01/10: [hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 15811a5d8c7362ac64828b771951dfa8304d98eb Author: Zhijiang AuthorDate: Fri Apr 26 17:13:08 2019 +0800 [hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests --- .../runtime/io/network/NetworkEnvironmentTest.java | 7 +- .../network/partition/InputChannelTestUtils.java | 24 +-- .../partition/consumer/InputGateTestBase.java | 10 +-- .../partition/consumer/LocalInputChannelTest.java | 16 ++--- .../partition/consumer/SingleInputGateBuilder.java | 82 ++ 5 files changed, 99 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index f20feb7..bcb4d04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.taskmanager.Task; import org.junit.Rule; @@ -287,7 +288,11 @@ public class NetworkEnvironmentTest { * @return input gate with some fake settings */ private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) { - return spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, partitionType, enableCreditBasedFlowControl)); + return spy(new SingleInputGateBuilder() + .setNumberOfChannels(numberOfChannels) + .setResultPartitionType(partitionType) + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); } private static void createRemoteInputChannel( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index 4feee4e..82b4c5b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; @@ -28,9 +26,8 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -75,24 +72,7 @@ public class InputChannelTestUtils { } public static SingleInputGate createSingleInputGate(int numberOfChannels) { - return createSingleInputGate(numberOfChannels, ResultPartitionType.PIPELINED, true); - } - - public static SingleInputGate createSingleInputGate( - int numberOfChannels, - ResultPartitionType partitionType, - boolean isCreditBased) { - - return new SingleInputGate( - "InputGate", - new JobID(), - new IntermediateDataSetID(), - partitionType, - 0, - numberOfChannels, - new NoOpTaskActions(), - new SimpleCounter(), - isCreditBased); + return new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).build(); } public static ConnectionManager createDummy
[flink] 05/10: [hotfix][network] Introduce ResultPartitionFactory
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e546009b7360c341d74b53c5d805e84f6276a897 Author: Andrey Zagrebin AuthorDate: Fri May 10 17:33:00 2019 +0200 [hotfix][network] Introduce ResultPartitionFactory --- .../runtime/io/network/NetworkEnvironment.java | 25 ++-- .../io/network/partition/ResultPartition.java | 64 +--- .../network/partition/ResultPartitionFactory.java | 162 + .../network/partition/ResultPartitionBuilder.java | 9 +- 4 files changed, 176 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index ea482f1..7974e83 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; @@ -88,7 +89,7 @@ public class NetworkEnvironment { private final TaskEventPublisher taskEventPublisher; - private final IOManager ioManager; + private final ResultPartitionFactory resultPartitionFactory; private boolean isShutdown; @@ -98,14 +99,14 @@ public class NetworkEnvironment { ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, TaskEventPublisher taskEventPublisher, - IOManager ioManager) { + ResultPartitionFactory resultPartitionFactory) { this.config = config; this.networkBufferPool = networkBufferPool; this.connectionManager = connectionManager; this.resultPartitionManager = resultPartitionManager; this.taskEventPublisher = taskEventPublisher; - this.ioManager = ioManager; this.isShutdown = false; + this.resultPartitionFactory = resultPartitionFactory; } public static NetworkEnvironment create( @@ -128,6 +129,8 @@ public class NetworkEnvironment { registerNetworkMetrics(metricGroup, networkBufferPool); ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + ResultPartitionFactory resultPartitionFactory = + new ResultPartitionFactory(resultPartitionManager, ioManager); return new NetworkEnvironment( config, @@ -135,7 +138,7 @@ public class NetworkEnvironment { connectionManager, resultPartitionManager, taskEventPublisher, - ioManager); + resultPartitionFactory); } private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { @@ -283,18 +286,8 @@ public class NetworkEnvironment { ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()]; int counter = 0; for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) { - resultPartitions[counter++] = new ResultPartition( - taskName, - taskActions, - jobId, - new ResultPartitionID(rpdd.getPartitionId(), executionId), - rpdd.getPartitionType(), - rpdd.getNumberOfSubpartitions(), - rpdd.getMaxParallelism(), - resultPartitionManager, - partitionConsumableNotifier, - ioManager, - rpdd.sendScheduleOrUpdateConsumersMessage()); + resultPartitions[counter++] = resultPartitionFactory.create( + taskN
[flink] 06/10: [hotfix][network] Introduce SingleInputGateFactory
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c00d87969d6956a9931173e2bd2923493e33f676 Author: Andrey Zagrebin AuthorDate: Fri May 10 17:39:28 2019 +0200 [hotfix][network] Introduce SingleInputGateFactory --- .../runtime/io/network/NetworkEnvironment.java | 20 ++- .../partition/consumer/SingleInputGate.java| 96 - .../partition/consumer/SingleInputGateFactory.java | 158 + .../partition/consumer/SingleInputGateTest.java| 21 +-- .../StreamNetworkBenchmarkEnvironment.java | 22 +-- 5 files changed, 197 insertions(+), 120 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 7974e83..d0e8263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; @@ -91,6 +92,8 @@ public class NetworkEnvironment { private final ResultPartitionFactory resultPartitionFactory; + private final SingleInputGateFactory singleInputGateFactory; + private boolean isShutdown; private NetworkEnvironment( @@ -99,14 +102,16 @@ public class NetworkEnvironment { ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, TaskEventPublisher taskEventPublisher, - ResultPartitionFactory resultPartitionFactory) { + ResultPartitionFactory resultPartitionFactory, + SingleInputGateFactory singleInputGateFactory) { this.config = config; this.networkBufferPool = networkBufferPool; this.connectionManager = connectionManager; this.resultPartitionManager = resultPartitionManager; this.taskEventPublisher = taskEventPublisher; - this.isShutdown = false; this.resultPartitionFactory = resultPartitionFactory; + this.singleInputGateFactory = singleInputGateFactory; + this.isShutdown = false; } public static NetworkEnvironment create( @@ -132,13 +137,17 @@ public class NetworkEnvironment { ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(resultPartitionManager, ioManager); + SingleInputGateFactory singleInputGateFactory = + new SingleInputGateFactory(config, connectionManager, resultPartitionManager, taskEventPublisher); + return new NetworkEnvironment( config, networkBufferPool, connectionManager, resultPartitionManager, taskEventPublisher, - resultPartitionFactory); + resultPartitionFactory, + singleInputGateFactory); } private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { @@ -168,6 +177,7 @@ public class NetworkEnvironment { return networkBufferPool; } + @VisibleForTesting public NetworkEnvironmentConfiguration getConfiguration() { return config; } @@ -311,12 +321,10 @@ public class NetworkEnvironment { SingleInputGate[] inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()]; int counter = 0; for (InputGateDeploymentDescriptor igdd : inputGateDeploymentDescriptors) { - inputGates[counter++] = SingleInputGate.create( + inputGates[counter++] = singleInputGateFactory.create( taskName, jobId, igdd, - this, - taskEventPublisher,
[flink] branch master updated: [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e16fa9f [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API e16fa9f is described below commit e16fa9fe1506ec725f5c5abb2b24afa246e17dae Author: Wei Zhong AuthorDate: Tue May 21 16:25:16 2019 +0800 [FLINK-12439][python] Add FileSystem Connector with CSV format support in Python Table API Brief change log: - Add all of the existing descriptor interfaces align Java Table API. - Add FileSystem connector and OldCsv format support. - The `schema(..)` of OldCsv will be added in FLINK-12588. This closes #8488 --- flink-python/pyflink/java_gateway.py | 1 + flink-python/pyflink/table/__init__.py | 5 + flink-python/pyflink/table/table_descriptor.py | 510 + flink-python/pyflink/table/table_environment.py| 91 +++ .../pyflink/table/tests/test_descriptor.py | 618 + .../table/tests/test_environment_completeness.py | 2 +- 6 files changed, 1226 insertions(+), 1 deletion(-) diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index ed1dc89..e5c8330 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -101,6 +101,7 @@ def launch_gateway(): java_import(gateway.jvm, "org.apache.flink.table.api.*") java_import(gateway.jvm, "org.apache.flink.table.api.java.*") java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*") +java_import(gateway.jvm, "org.apache.flink.table.descriptors.*") java_import(gateway.jvm, "org.apache.flink.table.sources.*") java_import(gateway.jvm, "org.apache.flink.table.sinks.*") java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation") diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index dcbc0ab..4ea3b3f 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -40,6 +40,7 @@ from pyflink.table.table_sink import TableSink, CsvTableSink from pyflink.table.table_source import TableSource, CsvTableSource from pyflink.table.types import DataTypes from pyflink.table.window import Tumble, Session, Slide, Over +from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv, FileSystem __all__ = [ 'TableEnvironment', @@ -56,4 +57,8 @@ __all__ = [ 'Session', 'Slide', 'Over', +'Rowtime', +'Schema', +'OldCsv', +'FileSystem', ] diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py new file mode 100644 index 000..ed9a157 --- /dev/null +++ b/flink-python/pyflink/table/table_descriptor.py @@ -0,0 +1,510 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from abc import ABCMeta + +from py4j.java_gateway import get_method + +from pyflink.java_gateway import get_gateway +from pyflink.util.type_utils import to_java_type + +if sys.version >= '3': +unicode = str + +__all__ = [ +'Rowtime', +'Schema', +'OldCsv', +'FileSystem' +] + + +class Descriptor(object): +""" +Base class of the descriptors that adds a set of string-based, normalized properties for +describing DDL information. + +Typical characteristics of a descriptor are: +- descriptors have a default constructor +- descriptors themselves contain very little logic +- corresponding validators validate the correctness (goal: have a single point of validation) + +A descriptor is similar to a builder in a builder pattern, thus, mutable for building +properties. +""" + +__metaclass__ = ABCMeta + +def __init__(self, j_descriptor): +self._j_descriptor = j_descriptor + +def to_properties(self): +""" +Converts
[flink] branch master updated: [FLINK-12578][build] Use https URL for Maven repositories
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 65477d3 [FLINK-12578][build] Use https URL for Maven repositories 65477d3 is described below commit 65477d365330751cd55db1c3b18e1fc9bd90d321 Author: Jungtaek Lim AuthorDate: Wed May 22 19:16:02 2019 +0900 [FLINK-12578][build] Use https URL for Maven repositories MapR repositories are excluded from this change as there appears to be some certificate issue on the MapR side. The latest documentation (https://mapr.com/docs/61/DevelopmentGuide/MavenArtifacts.html) also recommends using the http url. --- flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml | 2 +- flink-formats/flink-avro-confluent-registry/pom.xml| 2 +- pom.xml| 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml index 71ffb98..12d4950 100644 --- a/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml +++ b/flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml @@ -35,7 +35,7 @@ under the License. confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml index 9c89167..aa6382f 100644 --- a/flink-formats/flink-avro-confluent-registry/pom.xml +++ b/flink-formats/flink-avro-confluent-registry/pom.xml @@ -32,7 +32,7 @@ under the License. confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ diff --git a/pom.xml b/pom.xml index ed9ae64..c7b6ac3 100644 --- a/pom.xml +++ b/pom.xml @@ -1017,14 +1017,14 @@ under the License. HDPReleases HDP Releases - http://repo.hortonworks.com/content/repositories/releases/ + https://repo.hortonworks.com/content/repositories/releases/ false true HortonworksJettyHadoop HDP Jetty - http://repo.hortonworks.com/content/repositories/jetty-hadoop + https://repo.hortonworks.com/content/repositories/jetty-hadoop false true
[flink] 01/02: [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop.
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 022f6cceef65859bc6f172151d09140038297f69 Author: Stefan Richter AuthorDate: Fri May 10 11:20:02 2019 +0200 [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop. This closes #8409. This closes #8431. This also decomposes monolithic run-loops in StreamTask implementations into step-wise calls. --- .../runtime/tasks/OneInputStreamTask.java | 12 +- .../streaming/runtime/tasks/SourceStreamTask.java | 5 +- .../runtime/tasks/StreamIterationHead.java | 105 - .../flink/streaming/runtime/tasks/StreamTask.java | 82 ++- .../runtime/tasks/TwoInputStreamTask.java | 13 +- .../streaming/runtime/tasks/mailbox/Mailbox.java | 36 .../runtime/tasks/mailbox/MailboxImpl.java | 236 + .../runtime/tasks/mailbox/MailboxReceiver.java | 59 ++ .../runtime/tasks/mailbox/MailboxSender.java | 52 + ...heckpointExceptionHandlerConfigurationTest.java | 4 +- .../tasks/StreamTaskCancellationBarrierTest.java | 4 +- .../runtime/tasks/StreamTaskTerminationTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java| 21 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 3 +- .../runtime/tasks/SynchronousCheckpointTest.java | 3 +- .../tasks/TaskCheckpointingBehaviourTest.java | 4 +- .../runtime/tasks/mailbox/MailboxImplTest.java | 170 +++ .../flink/streaming/util/MockStreamTask.java | 4 +- .../jobmaster/JobMasterStopWithSavepointIT.java| 10 +- 19 files changed, 726 insertions(+), 100 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 7498518..7b82d8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -39,8 +39,6 @@ public class OneInputStreamTask extends StreamTask inputProcessor; - private volatile boolean running = true; - private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge(); /** @@ -98,12 +96,9 @@ public class OneInputStreamTask extends StreamTask inputProcessor = this.inputProcessor; - - while (running && inputProcessor.processInput()) { - // all the work happens in the "processInput" method + protected void performDefaultAction(ActionContext context) throws Exception { + if (!inputProcessor.processInput()) { + context.allActionsCompleted(); } } @@ -116,6 +111,5 @@ public class OneInputStreamTask extends StreamTask, OP extends S } @Override - protected void run() throws Exception { + protected void performDefaultAction(ActionContext context) throws Exception { + // Against the usual contract of this method, this implementation is not step-wise but blocking instead for + // compatibility reasons with the current source interface (source functions run as a loop, not in steps). headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + context.allActionsCompleted(); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index ecef7f0..d25bd23 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,88 +42,72 @@ public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); - private volatile boolean running = true; + private RecordWriterOutput[] streamOutputs; + + private final BlockingQueue> dataChannel; + private final String brokerID; + private final long iterationWaitTime; + private final boolean shouldWait; public StreamIterationHead(E
[flink] 02/02: [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845 Author: Stefan Richter AuthorDate: Tue May 14 15:33:48 2019 +0200 [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks. This closes #8442. --- .../streaming/runtime/tasks/SourceStreamTask.java | 66 +- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index fd50a1a..e604f2c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException; public class SourceStreamTask, OP extends StreamSource> extends StreamTask { + private static final Runnable SOURCE_POISON_LETTER = () -> {}; + private volatile boolean externallyInducedCheckpoints; public SourceStreamTask(Environment env) { @@ -101,12 +103,43 @@ public class SourceStreamTask, OP extends S protected void performDefaultAction(ActionContext context) throws Exception { // Against the usual contract of this method, this implementation is not step-wise but blocking instead for // compatibility reasons with the current source interface (source functions run as a loop, not in steps). - headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread(); + sourceThread.start(); + + // We run an alternative mailbox loop that does not involve default actions and synchronizes around actions. + try { + runAlternativeMailboxLoop(); + } catch (Exception mailboxEx) { + // We cancel the source function if some runtime exception escaped the mailbox. + if (!isCanceled()) { + cancelTask(); + } + throw mailboxEx; + } + + sourceThread.join(); + sourceThread.checkThrowSourceExecutionException(); + context.allActionsCompleted(); } + private void runAlternativeMailboxLoop() throws InterruptedException { + + while (true) { + + Runnable letter = mailbox.takeMail(); + if (letter == SOURCE_POISON_LETTER) { + break; + } + + synchronized (getCheckpointLock()) { + letter.run(); + } + } + } + @Override - protected void cancelTask() throws Exception { + protected void cancelTask() { if (headOperator != null) { headOperator.cancel(); } @@ -133,4 +166,33 @@ public class SourceStreamTask, OP extends S } } } + + /** +* Runnable that executes the the source function in the head operator. +*/ + private class LegacySourceFunctionThread extends Thread { + + private Throwable sourceExecutionThrowable; + + LegacySourceFunctionThread() { + this.sourceExecutionThrowable = null; + } + + @Override + public void run() { + try { + headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + } catch (Throwable t) { + sourceExecutionThrowable = t; + } finally { + mailbox.clearAndPut(SOURCE_POISON_LETTER); + } + } + + void checkThrowSourceExecutionException() throws Exception { + if (sourceExecutionThrowable != null) { + throw new Exception(sourceExecutionThrowable); + } + } + } }
[flink] branch master updated (3813bb9 -> ead9139)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3813bb9 [FLINK-12241][hive] Support Flink functions in catalog function APIs of HiveCatalog new 022f6cc [FLINK-12478, FLINK-12480][runtime] Introduce mailbox to StreamTask main-loop. new ead9139 [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/tasks/OneInputStreamTask.java | 12 +- .../streaming/runtime/tasks/SourceStreamTask.java | 71 ++- .../runtime/tasks/StreamIterationHead.java | 105 - .../flink/streaming/runtime/tasks/StreamTask.java | 82 ++- .../runtime/tasks/TwoInputStreamTask.java | 13 +- .../streaming/runtime/tasks/mailbox/Mailbox.java | 23 +- .../runtime/tasks/mailbox/MailboxImpl.java | 236 + .../runtime/tasks/mailbox/MailboxReceiver.java | 59 ++ .../runtime/tasks/mailbox/MailboxSender.java | 52 + ...heckpointExceptionHandlerConfigurationTest.java | 4 +- .../tasks/StreamTaskCancellationBarrierTest.java | 4 +- .../runtime/tasks/StreamTaskTerminationTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java| 21 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 3 +- .../runtime/tasks/SynchronousCheckpointTest.java | 3 +- .../tasks/TaskCheckpointingBehaviourTest.java | 4 +- .../runtime/tasks/mailbox/MailboxImplTest.java | 170 +++ .../flink/streaming/util/MockStreamTask.java | 4 +- .../jobmaster/JobMasterStopWithSavepointIT.java| 10 +- 19 files changed, 762 insertions(+), 117 deletions(-) copy flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoWriter.java => flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mailbox.java (60%) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImpl.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxReceiver.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxSender.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.java