[flink] 06/06: [FLINK-12254][table-common] Add a converter between old type information behavior and data type
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 185ed0e2350b132bbec20b54e6cae6fe72710eae Author: Timo Walther AuthorDate: Tue May 21 15:45:47 2019 +0200 [FLINK-12254][table-common] Add a converter between old type information behavior and data type --- .../types/logical/LegacyTypeInformationType.java | 120 +++ .../utils/LegacyTypeInfoDataTypeConverter.java | 352 + .../flink/table/types/utils/TypeConversions.java | 85 + .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 + 4 files changed, 704 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java new file mode 100644 index 000..a6d6a20 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * This type is a temporary solution to fully support the old type system stack through the new + * stack. Many types can be mapped directly to the new type system, however, some types such as + * {@code DECIMAL}, POJOs, or case classes need special handling. + * + * This type differs from {@link TypeInformationAnyType}. This type is allowed to travel through + * the stack whereas {@link TypeInformationAnyType} should be resolved eagerly to {@link AnyType} by + * the planner. + * + * This class can be removed once we have removed all deprecated methods that take or return + * {@link TypeInformation}. + * + * @see LegacyTypeInfoDataTypeConverter + */ +@Internal +public final class LegacyTypeInformationType extends LogicalType { + + private static final String FORMAT = "LEGACY(%s)"; + + private final TypeInformation typeInfo; + + public LegacyTypeInformationType(LogicalTypeRoot logicalTypeRoot, TypeInformation typeInfo) { + super(true, logicalTypeRoot); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + public TypeInformation getTypeInformation() { + return typeInfo; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new LegacyTypeInformationType<>(getTypeRoot(), typeInfo); + } + + @Override + public String asSerializableString() { + throw new TableException("Legacy type information has no serializable string representation."); + } + + @Override + public String asSummaryString() { + return withNullability(FORMAT, typeInfo); + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return typeInfo.getTypeClass().isAssignableFrom(clazz); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return clazz.isAssignableFrom(typeInfo.getTypeClass()); + } + + @Override + public Class getDefaultConversion() { + return typeInfo.getTypeClass(); + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getCla
[flink] branch master updated (7a3f081 -> 185ed0e)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7a3f081 [FLINK-12411][table-planner][tests] Workaround limited support of not nullable fields in window aggregation new 36fef44 [hotfix][table-common] Add default precision temporal data types new 1ec14eb [hotfix][table-common] Add assumption about expressions and data types new a71c200 [hotfix][table-common] Fix equality of data types with same conversion class new d7d2442 [hotfix][table-common] Add logical type check utilities new 3b13b60 [hotfix][table-common] Fix invalid class to data type conversion new 185ed0e [FLINK-12254][table-common] Add a converter between old type information behavior and data type The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/org/apache/flink/table/api/DataTypes.java | 83 + .../apache/flink/table/expressions/Expression.java | 6 +- .../flink/table/types/CollectionDataType.java | 26 +- .../org/apache/flink/table/types/DataType.java | 20 +- .../types/logical/LegacyTypeInformationType.java | 120 +++ .../types/logical/utils/LogicalTypeChecks.java | 87 + .../table/types/utils/ClassDataTypeConverter.java | 2 +- .../utils/LegacyTypeInfoDataTypeConverter.java | 352 + .../flink/table/types/utils/TypeConversions.java | 85 + .../org/apache/flink/table/types/DataTypeTest.java | 6 + .../apache/flink/table/types/DataTypesTest.java| 12 + .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 + 12 files changed, 926 insertions(+), 20 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java
[flink] 02/06: [hotfix][table-common] Add assumption about expressions and data types
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1ec14eb44db7b6ea28f560eb69c6d016890bdb70 Author: Timo Walther AuthorDate: Tue May 21 12:23:20 2019 +0200 [hotfix][table-common] Add assumption about expressions and data types --- .../main/java/org/apache/flink/table/expressions/Expression.java| 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java index 6b68dce..57c73d4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java @@ -19,6 +19,7 @@ package org.apache.flink.table.expressions; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.DataType; import java.util.List; @@ -26,8 +27,11 @@ import java.util.List; * The interface for all expressions. * * Expressions represent a logical tree for producing a computation result. Every expression - * consists of zero or more sub-expressions. Expressions might be literal values, function calls, + * consists of zero, one, or more sub-expressions. Expressions might be literal values, function calls, * or field references. + * + * Expressions are part of the API. Thus, values and return types are expressed as instances of + * {@link DataType}. */ @PublicEvolving public interface Expression {
[flink] 01/06: [hotfix][table-common] Add default precision temporal data types
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 36fef4457a7f1de47989c8a2485581bcf8633b32 Author: Timo Walther AuthorDate: Tue May 21 12:19:04 2019 +0200 [hotfix][table-common] Add default precision temporal data types --- .../java/org/apache/flink/table/api/DataTypes.java | 83 ++ .../apache/flink/table/types/DataTypesTest.java| 12 2 files changed, 95 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java index 2591227..dc7c319 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java @@ -240,6 +240,7 @@ public final class DataTypes { * Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided. * +* @see #TIME() * @see TimeType */ public static DataType TIME(int precision) { @@ -247,6 +248,22 @@ public final class DataTypes { } /** +* Data type of a time WITHOUT time zone {@code TIME} with no fractional seconds by default. +* +* An instance consists of {@code hour:minute:second} with up to second precision +* and values ranging from {@code 00:00:00} to {@code 23:59:59}. +* +* Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the +* semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided. +* +* @see #TIME(int) +* @see TimeType +*/ + public static DataType TIME() { + return new AtomicDataType(new TimeType()); + } + + /** * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP(p)} where {@code p} is the number * of digits of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both * inclusive). @@ -267,6 +284,26 @@ public final class DataTypes { } /** +* Data type of a timestamp WITHOUT time zone {@code TIMESTAMP} with 6 digits of fractional seconds +* by default. +* +* An instance consists of {@code year-month-day hour:minute:second[.fractional]} with up to +* microsecond precision and values ranging from {@code -01-01 00:00:00.00} to +* {@code -12-31 23:59:59.99}. +* +* Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the +* semantics are closer to {@link java.time.LocalDateTime}. +* +* @see #TIMESTAMP(int) +* @see #TIMESTAMP_WITH_TIME_ZONE(int) +* @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int) +* @see TimestampType +*/ + public static DataType TIMESTAMP() { + return new AtomicDataType(new TimestampType()); + } + + /** * Data type of a timestamp WITH time zone {@code TIMESTAMP(p) WITH TIME ZONE} where {@code p} is * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0 * and 9 (both inclusive). @@ -287,6 +324,26 @@ public final class DataTypes { } /** +* Data type of a timestamp WITH time zone {@code TIMESTAMP WITH TIME ZONE} with 6 digits of fractional +* seconds by default. +* +* An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up +* to microsecond precision and values ranging from {@code -01-01 00:00:00.00 +14:59} to +* {@code -12-31 23:59:59.99 -14:59}. +* +* Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the +* semantics are closer to {@link java.time.OffsetDateTime}. +* +* @see #TIMESTAMP_WITH_TIME_ZONE(int) +* @see #TIMESTAMP(int) +* @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int) +* @see ZonedTimestampType +*/ + public static DataType TIMESTAMP_WITH_TIME_ZONE() { + return new AtomicDataType(new ZonedTimestampType()); + } + + /** * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where * {@code p} is the number of digits of fractional seconds (=precision). {@code p} must have a value * between 0 and 9 (both inclusive). @@ -313,6 +370,32 @@ public final class DataTypes { } /** +* Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP WITH LOCAL TIM
[flink] 03/06: [hotfix][table-common] Fix equality of data types with same conversion class
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit a71c200b470ffe7ae614870d5855d1e24673c08b Author: Timo Walther AuthorDate: Tue May 21 13:35:46 2019 +0200 [hotfix][table-common] Fix equality of data types with same conversion class --- .../flink/table/types/CollectionDataType.java | 26 +- .../org/apache/flink/table/types/DataType.java | 20 +++-- .../org/apache/flink/table/types/DataTypeTest.java | 6 + 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java index 17b7096..b50fea9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java @@ -43,7 +43,7 @@ public final class CollectionDataType extends DataType { LogicalType logicalType, @Nullable Class conversionClass, DataType elementDataType) { - super(logicalType, conversionClass); + super(logicalType, ensureArrayConversionClass(logicalType, elementDataType, conversionClass)); this.elementDataType = Preconditions.checkNotNull(elementDataType, "Element data type must not be null."); } @@ -82,16 +82,6 @@ public final class CollectionDataType extends DataType { } @Override - public Class getConversionClass() { - // arrays are a special case because their default conversion class depends on the - // conversion class of the element type - if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && conversionClass == null) { - return Array.newInstance(elementDataType.getConversionClass(), 0).getClass(); - } - return super.getConversionClass(); - } - - @Override public R accept(DataTypeVisitor visitor) { return visitor.visit(this); } @@ -115,4 +105,18 @@ public final class CollectionDataType extends DataType { public int hashCode() { return Objects.hash(super.hashCode(), elementDataType); } + + // + + private static Class ensureArrayConversionClass( + LogicalType logicalType, + DataType elementDataType, + @Nullable Class clazz) { + // arrays are a special case because their default conversion class depends on the + // conversion class of the element type + if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == null) { + return Array.newInstance(elementDataType.getConversionClass(), 0).getClass(); + } + return clazz; + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java index 303a052..6b783e44 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java @@ -52,13 +52,15 @@ import java.util.Objects; @PublicEvolving public abstract class DataType implements Serializable { - protected LogicalType logicalType; + protected final LogicalType logicalType; - protected @Nullable Class conversionClass; + protected final Class conversionClass; DataType(LogicalType logicalType, @Nullable Class conversionClass) { this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null."); - this.conversionClass = performEarlyClassValidation(logicalType, conversionClass); + this.conversionClass = performEarlyClassValidation( + logicalType, + ensureConversionClass(logicalType, conversionClass)); } /** @@ -79,9 +81,6 @@ public abstract class DataType implements Serializable { * @return the expected conversion class */ public Class getConversionClass() { - if (conversionClass == null) { - return logicalType.getDefaultConversion(); - } return conversionClass; } @@ -133,7 +132,7 @@ public abstract class DataType implements Serializable { } DataType dataType = (DataType) o;
[flink] 04/06: [hotfix][table-common] Add logical type check utilities
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d7d24425e47408eb094b7f5aa6ecfcffd7f89db7 Author: Timo Walther AuthorDate: Tue May 21 14:42:02 2019 +0200 [hotfix][table-common] Add logical type check utilities --- .../types/logical/utils/LogicalTypeChecks.java | 87 ++ 1 file changed, 87 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java new file mode 100644 index 000..3958922 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +/** + * Utilities for checking {@link LogicalType}. + */ +@Internal +public final class LogicalTypeChecks { + + private static final TimeAttributeChecker TIME_ATTRIBUTE_CHECKER = new TimeAttributeChecker(); + + public static boolean hasRoot(LogicalType logicalType, LogicalTypeRoot typeRoot) { + return logicalType.getTypeRoot() == typeRoot; + } + + public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) { + return logicalType.getTypeRoot().getFamilies().contains(family); + } + + public static boolean isTimeAttribute(LogicalType logicalType) { + return logicalType.accept(TIME_ATTRIBUTE_CHECKER) != TimestampKind.REGULAR; + } + + public static boolean isRowtimeAttribute(LogicalType logicalType) { + return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == TimestampKind.ROWTIME; + } + + public static boolean isProctimeAttribute(LogicalType logicalType) { + return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == TimestampKind.PROCTIME; + } + + private LogicalTypeChecks() { + // no instantiation + } + + // + + private static class TimeAttributeChecker extends LogicalTypeDefaultVisitor { + + @Override + public TimestampKind visit(TimestampType timestampType) { + return timestampType.getKind(); + } + + @Override + public TimestampKind visit(ZonedTimestampType zonedTimestampType) { + return zonedTimestampType.getKind(); + } + + @Override + public TimestampKind visit(LocalZonedTimestampType localZonedTimestampType) { + return localZonedTimestampType.getKind(); + } + + @Override + protected TimestampKind defaultMethod(LogicalType logicalType) { + // we don't verify that type is actually a timestamp + return TimestampKind.REGULAR; + } + } +}
[flink] 05/06: [hotfix][table-common] Fix invalid class to data type conversion
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3b13b60dc127d075b378d2f2e0a02b1b738c1245 Author: Timo Walther AuthorDate: Tue May 21 14:44:20 2019 +0200 [hotfix][table-common] Fix invalid class to data type conversion --- .../java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java index 599c8b2..a71c682 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java @@ -56,7 +56,7 @@ public final class ClassDataTypeConverter { addDefaultDataType(double.class, DataTypes.DOUBLE()); addDefaultDataType(java.sql.Date.class, DataTypes.DATE()); addDefaultDataType(java.time.LocalDate.class, DataTypes.DATE()); - addDefaultDataType(java.sql.Time.class, DataTypes.TIME(9)); + addDefaultDataType(java.sql.Time.class, DataTypes.TIME(3)); addDefaultDataType(java.time.LocalTime.class, DataTypes.TIME(9)); addDefaultDataType(java.sql.Timestamp.class, DataTypes.TIMESTAMP(9)); addDefaultDataType(java.time.LocalDateTime.class, DataTypes.TIMESTAMP(9));
[flink-shaded] branch master updated: [hotfix][hadoop] Us e o.a.f.shaded as patternPrefix
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-shaded.git The following commit(s) were added to refs/heads/master by this push: new 3746069 [hotfix][hadoop] Us e o.a.f.shaded as patternPrefix 3746069 is described below commit 37460692afceacce6a6df9fe056428f873c109d9 Author: Chesnay Schepler AuthorDate: Thu May 23 10:56:40 2019 +0200 [hotfix][hadoop] Us e o.a.f.shaded as patternPrefix --- flink-shaded-hadoop-2/pom.xml | 16 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-shaded-hadoop-2/pom.xml b/flink-shaded-hadoop-2/pom.xml index 3cbf606..91451be 100644 --- a/flink-shaded-hadoop-2/pom.xml +++ b/flink-shaded-hadoop-2/pom.xml @@ -1026,35 +1026,35 @@ under the License. org.objectweb.asm - org.apache.flink.hadoop2.shaded.org.objectweb.asm + org.apache.flink.shaded.hadoop2.org.objectweb.asm org.jboss.netty - org.apache.flink.hadoop2.shaded.org.jboss.netty + org.apache.flink.shaded.hadoop2.org.jboss.netty io.netty - org.apache.flink.hadoop2.shaded.io.netty + org.apache.flink.shaded.hadoop2.io.netty org.apache.curator - org.apache.flink.hadoop2.shaded.org.apache.curator + org.apache.flink.shaded.hadoop2.org.apache.curator org.apache.http - org.apache.flink.hadoop2.shaded.org.apache.http + org.apache.flink.shaded.hadoop2.org.apache.http org.apache.commons.httpclient - org.apache.flink.hadoop2.shaded.org.apache.commons.httpclient + org.apache.flink.shaded.hadoop2.org.apache.commons.httpclient org.htrace - org.apache.flink.hadoop2.shaded.org.htrace + org.apache.flink.shaded.hadoop2.org.htrace org.codehaus.jackson - org.apache.flink.hadoop2.shaded.org.codehaus.jackson + org.apache.flink.shaded.hadoop2.org.codehaus.jackson
[flink] branch master updated: [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 21b92ac [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase 21b92ac is described below commit 21b92ac95f1824e9b1ca483fa3ffaaf77ef14d4a Author: azagrebin AuthorDate: Thu May 23 11:35:48 2019 +0200 [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase --- .../executiongraph/ExecutionGraphRestartTest.java | 709 ++--- 1 file changed, 331 insertions(+), 378 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 1c5b650..1e6be72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; @@ -35,9 +35,6 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; -import org.apache.flink.runtime.instance.HardwareDescription; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -45,10 +42,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; +import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -58,23 +64,18 @@ import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.After; -import org.junit.Assert; import org.junit.Test; -import javax.annotation.Nonnull; - import java.io.IOException; -import java.net.InetAddress; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import scala.concurrent.duration.FiniteDuration; - import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.crea
[flink] branch master updated: [FLINK-12586][optimizer] Reverse stderr/stdout order
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 396e93c [FLINK-12586][optimizer] Reverse stderr/stdout order 396e93c is described below commit 396e93cc4c5fd4086d715234589b3a991a2c809b Author: liyafan82 <42827532+liyafa...@users.noreply.github.com> AuthorDate: Thu May 23 17:57:03 2019 +0800 [FLINK-12586][optimizer] Reverse stderr/stdout order --- .../org/apache/flink/client/program/OptimizerPlanEnvironment.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java index faacd9f..d524ad9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java @@ -104,8 +104,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment { throw new ProgramInvocationException( "The program plan could not be fetched - the program aborted pre-maturely." - + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stdout) - + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stderr)); + + "\n\nSystem.err: " + (stderr.length() == 0 ? "(none)" : stderr) + + "\n\nSystem.out: " + (stdout.length() == 0 ? "(none)" : stdout)); } //
[flink] branch master updated: [hotfix][docs] Correct documentation of ExEnv#fromElements
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9fde18c [hotfix][docs] Correct documentation of ExEnv#fromElements 9fde18c is described below commit 9fde18cc4c5bbaac8f992121fb709e9cbb97b6b8 Author: Robert Stoll AuthorDate: Thu May 23 12:04:21 2019 +0200 [hotfix][docs] Correct documentation of ExEnv#fromElements --- docs/dev/batch/index.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md index 7d18bac..cb65f0b 100644 --- a/docs/dev/batch/index.md +++ b/docs/dev/batch/index.md @@ -829,7 +829,7 @@ File-based: Collection-based: -- `fromCollection(Collection)` - Creates a data set from the Java Java.util.Collection. All elements +- `fromCollection(Collection)` - Creates a data set from a Java.util.Collection. All elements in the collection must be of the same type. - `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The class specifies the @@ -970,8 +970,8 @@ File-based: Collection-based: -- `fromCollection(Seq)` - Creates a data set from a Seq. All elements - in the collection must be of the same type. +- `fromCollection(Iterable)` - Creates a data set from an Iterable. All elements + returned by the Iterable must be of the same type. - `fromCollection(Iterator)` - Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.
[flink] branch master updated: [FLINK-12497][network] Move ConnectionManager#start arguments to constructor
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 0cb56a4 [FLINK-12497][network] Move ConnectionManager#start arguments to constructor 0cb56a4 is described below commit 0cb56a4dfa9bade080730312ab94b6901c7fb91a Author: zhijiang AuthorDate: Thu May 23 18:38:14 2019 +0800 [FLINK-12497][network] Move ConnectionManager#start arguments to constructor --- .../flink/runtime/io/network/ConnectionManager.java | 3 +-- .../runtime/io/network/LocalConnectionManager.java | 3 +-- .../runtime/io/network/NetworkEnvironment.java | 9 ++--- .../io/network/netty/NettyConnectionManager.java| 21 + .../network/netty/NettyConnectionManagerTest.java | 20 .../io/network/partition/InputChannelTestUtils.java | 3 +-- 6 files changed, 30 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index a84cd8a..75f39e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; -import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import java.io.IOException; @@ -29,7 +28,7 @@ import java.io.IOException; */ public interface ConnectionManager { - void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventDispatcher) throws IOException; + void start() throws IOException; /** * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java index 11cab6b..46ca7fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; -import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; /** * A connection manager implementation to bypass setup overhead for task managers running in local @@ -28,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; public class LocalConnectionManager implements ConnectionManager { @Override - public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventPublisher) { + public void start() { } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 1f2ee7e..7ee2a20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -120,8 +120,12 @@ public class NetworkEnvironment { checkNotNull(config); NettyConfig nettyConfig = config.nettyConfig(); + + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + ConnectionManager connectionManager = nettyConfig != null ? - new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager(); + new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) : + new LocalConnectionManager(); NetworkBufferPool networkBufferPool = new NetworkBufferPool( config.numNetworkBuffers(), @@ -130,7 +134,6 @@ public class NetworkEnvironment { registerNetworkMetrics(metricGroup, networkBufferPool); - ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( resultPartitionManager, ioManager, @@ -280,7 +283,7 @@ public class NetworkEnvironment { try { LOG.debug("Starting network connection manager"); - connectionManager.start(resultPartition
[flink] branch master updated: [hotfix] fix typos. 1. [flink-core] below -> above in TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in LocalStreamEnvironment methods
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new efec532 [hotfix] fix typos. 1. [flink-core] below -> above in TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in LocalStreamEnvironment methods efec532 is described below commit efec53212419b4558ecd6cdf822a9da92e9d2bc2 Author: zhangxin516 AuthorDate: Thu May 16 17:45:24 2019 +0800 [hotfix] fix typos. 1. [flink-core] below -> above in TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in LocalStreamEnvironment methods --- .../main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java | 2 +- .../apache/flink/streaming/api/environment/LocalStreamEnvironment.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 2a5f81a..565ac74 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -108,7 +108,7 @@ public abstract class TypeInformation implements Serializable { /** * Gets the number of logical fields in this type. This includes its nested and transitively nested -* fields, in the case of composite types. In the example below, the OuterType type has three +* fields, in the case of composite types. In the example above, the OuterType type has three * fields in total. * * The total number of fields must be at least 1. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 43f1464..eca802f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -76,7 +76,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { } /** -* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user +* Executes the JobGraph of the on a mini cluster of ClusterUtil with a user * specified name. * * @param jobName
[flink] 01/02: [FLINK-12254][table] Update TableSchema to new type system
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 98f3046fae76405b3fcf2b5ef82835a81624498e Author: Timo Walther AuthorDate: Tue May 21 16:45:27 2019 +0200 [FLINK-12254][table] Update TableSchema to new type system --- .../flink/table/client/cli/CliResultView.java | 2 +- .../apache/flink/table/client/cli/CliUtils.java| 4 +- .../org/apache/flink/table/api/TableSchema.java| 140 +++-- .../apache/flink/table/api/TableSchemaTest.scala | 8 +- .../api/validation/TableSchemaValidationTest.scala | 6 +- 5 files changed, 113 insertions(+), 47 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java index 67ce5eb..407a04b 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java @@ -145,7 +145,7 @@ public abstract class CliResultView> extends CliView final CliRowView view = new CliRowView( client, resultDescriptor.getResultSchema().getFieldNames(), - CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldTypes()), + CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldDataTypes()), getRow(results.get(selectedRow))); view.open(); // enter view } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java index 77894e8..471d168 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java @@ -18,8 +18,8 @@ package org.apache.flink.table.client.cli; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.jline.utils.AttributedString; @@ -103,7 +103,7 @@ public final class CliUtils { return fields; } - public static String[] typesToString(TypeInformation[] types) { + public static String[] typesToString(DataType[] types) { final String[] typesAsString = new String[types.length]; for (int i = 0; i < types.length; i++) { typesAsString[i] = types[i].toString(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java index 385f9a0..c6804f9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java @@ -20,8 +20,9 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -33,10 +34,17 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.IntStream; + +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.Field; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** - * A table schema that represents a table's structure with field names and types. - */ + * A table schema that represents a table's structure with field names and data types. + */ @PublicEvolving public class TableSchema { @@ -44,20 +52,20 @@ public class TableSchema { private final String[] fieldNames; - private final TypeInformation[] fieldTypes; + private final DataType[] fieldDataTypes; private final Map fieldNameToIndex; - public TableSchema(String[] fieldNames, TypeInformation[] fieldTypes) { + private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) { this.fieldNames = Preconditions.checkNotNull(field
[flink] 02/02: [hotfix][connector-hive] Fix Hive type mapping to Table API type information
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9f19345767e930f4a9fcae89313de79688949f90 Author: Timo Walther AuthorDate: Wed May 22 11:12:23 2019 +0200 [hotfix][connector-hive] Fix Hive type mapping to Table API type information --- .../flink/table/catalog/hive/util/HiveTypeUtil.java | 17 ++--- .../catalog/hive/HiveCatalogGenericMetadataTest.java| 6 +++--- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java index d26d20b..5a94415 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java @@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive.util; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -64,24 +65,18 @@ public class HiveTypeUtil { return serdeConstants.DOUBLE_TYPE_NAME; } else if (type == BasicTypeInfo.STRING_TYPE_INFO) { return serdeConstants.STRING_TYPE_NAME; - } else if (type == BasicTypeInfo.DATE_TYPE_INFO) { + } else if (type == SqlTimeTypeInfo.DATE) { return serdeConstants.DATE_TYPE_NAME; - } else if (type == BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO) { + } else if (type == PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO) { return serdeConstants.BINARY_TYPE_NAME; - } else if (type instanceof SqlTimeTypeInfo) { + } else if (type == SqlTimeTypeInfo.TIMESTAMP) { return serdeConstants.TIMESTAMP_TYPE_NAME; - } else if (type instanceof BasicArrayTypeInfo) { - return toHiveArrayType((BasicArrayTypeInfo) type); } else { throw new UnsupportedOperationException( String.format("Flink doesn't support converting type %s to Hive type yet.", type.toString())); } } - private static String toHiveArrayType(BasicArrayTypeInfo arrayTypeInfo) { - return String.format(HIVE_ARRAY_TYPE_NAME_FORMAT, toHiveType(arrayTypeInfo.getComponentInfo())); - } - /** * Convert Hive data type to a Flink data type. * TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT @@ -127,11 +122,11 @@ public class HiveTypeUtil { case DOUBLE: return BasicTypeInfo.DOUBLE_TYPE_INFO; case DATE: - return BasicTypeInfo.DATE_TYPE_INFO; + return SqlTimeTypeInfo.DATE; case TIMESTAMP: return SqlTimeTypeInfo.TIMESTAMP; case BINARY: - return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO; + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; default: throw new UnsupportedOperationException( String.format("Flink doesn't support Hive primitive type %s yet", hiveType)); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java index ae5f5c89..9a35068 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java @@ -18,8 +18,8 @@ package org.apache.flink.table.catalog.hive; -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; @@ -65,8 +65,8 @@ public c
[flink] branch master updated (efec532 -> 9f19345)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from efec532 [hotfix] fix typos. 1. [flink-core] below -> above in TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in LocalStreamEnvironment methods new 98f3046 [FLINK-12254][table] Update TableSchema to new type system new 9f19345 [hotfix][connector-hive] Fix Hive type mapping to Table API type information The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../table/catalog/hive/util/HiveTypeUtil.java | 17 +-- .../hive/HiveCatalogGenericMetadataTest.java | 6 +- .../flink/table/client/cli/CliResultView.java | 2 +- .../apache/flink/table/client/cli/CliUtils.java| 4 +- .../org/apache/flink/table/api/TableSchema.java| 140 +++-- .../apache/flink/table/api/TableSchemaTest.scala | 8 +- .../api/validation/TableSchemaValidationTest.scala | 6 +- 7 files changed, 122 insertions(+), 61 deletions(-)
[flink] branch master updated (9f19345 -> ed31f4c)
This is an automated email from the ASF dual-hosted git repository. gary pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9f19345 [hotfix][connector-hive] Fix Hive type mapping to Table API type information new e10e9ef [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval new aa9b899 [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory new f9f43a5 [hotfix][runtime] Move scheduling-related classes to new package new ed31f4c [FLINK-12432][runtime] Add SchedulerNG stub implementation The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../generated/job_manager_configuration.html | 5 -- .../flink/configuration/JobManagerOptions.java | 25 +++ .../dispatcher/DefaultJobManagerRunnerFactory.java | 6 +- .../dispatcher/SchedulerNGFactoryFactory.java | 52 +++ .../restart/RestartStrategyFactory.java| 9 ++- .../restart/ThrowingRestartStrategy.java | 53 +++ .../apache/flink/runtime/jobmaster/JobMaster.java | 2 + .../factories/DefaultJobMasterServiceFactory.java | 2 +- .../DefaultScheduler.java} | 28 .../DefaultSchedulerFactory.java} | 19 ++ .../{jobmaster => scheduler}/LegacyScheduler.java | 3 +- .../LegacySchedulerFactory.java| 2 +- .../{jobmaster => scheduler}/SchedulerNG.java | 3 +- .../SchedulerNGFactory.java| 2 +- .../dispatcher/SchedulerNGFactoryFactoryTest.java | 77 ++ .../ThrowingRestartStrategyFactoryTest.java| 67 +++ .../flink/runtime/jobmaster/JobMasterTest.java | 2 + 17 files changed, 299 insertions(+), 58 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/LegacySchedulerFactory.java => scheduler/DefaultScheduler.java} (78%) copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/LegacySchedulerFactory.java => scheduler/DefaultSchedulerFactory.java} (79%) rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/LegacyScheduler.java (99%) rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/LegacySchedulerFactory.java (98%) rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNG.java (98%) rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNGFactory.java (97%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java
[flink] 01/04: [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e10e9efd22ef76605f9c3a33707221aecccdff0a Author: Gary Yao AuthorDate: Tue May 14 16:43:42 2019 +0200 [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval --- docs/_includes/generated/job_manager_configuration.html | 5 - .../org/apache/flink/configuration/JobManagerOptions.java| 12 2 files changed, 17 deletions(-) diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html index 177c362..73477fe 100644 --- a/docs/_includes/generated/job_manager_configuration.html +++ b/docs/_includes/generated/job_manager_configuration.html @@ -23,11 +23,6 @@ JVM heap size for the JobManager. -jobmanager.resourcemanager.reconnect-interval -2000 -This option specifies the interval in order to trigger a resource manager reconnection if the connection to the resource manager has been lost. This option is only intended for internal use. - - jobmanager.rpc.address (none) The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby J [...] diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index a91b931..69f445f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -120,18 +120,6 @@ public class JobManagerOptions { ).build()); /** -* This option specifies the interval in order to trigger a resource manager reconnection if the connection -* to the resource manager has been lost. -* -* This option is only intended for internal use. -*/ - public static final ConfigOption RESOURCE_MANAGER_RECONNECT_INTERVAL = - key("jobmanager.resourcemanager.reconnect-interval") - .defaultValue(2000L) - .withDescription("This option specifies the interval in order to trigger a resource manager reconnection if the connection" + - " to the resource manager has been lost. This option is only intended for internal use."); - - /** * The location where the JobManager stores the archives of completed jobs. */ public static final ConfigOption ARCHIVE_DIR =
[flink] 02/04: [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit aa9b89919ef10570920fdc6b22cabdf244b70508 Author: Gary Yao AuthorDate: Wed May 15 11:41:38 2019 +0200 [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory --- .../runtime/executiongraph/restart/RestartStrategyFactory.java | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index f15ee0b..b5361ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -22,14 +22,19 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import scala.concurrent.duration.Duration; + +/** + * Factory for {@link RestartStrategy}. + */ public abstract class RestartStrategyFactory implements Serializable { private static final long serialVersionUID = 7320252552640522191L; @@ -37,7 +42,7 @@ public abstract class RestartStrategyFactory implements Serializable { private static final String CREATE_METHOD = "createFactory"; /** -* Factory method to create a restart strategy +* Factory method to create a restart strategy. * @return The created restart strategy */ public abstract RestartStrategy createRestartStrategy();
[flink] 03/04: [hotfix][runtime] Move scheduling-related classes to new package
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f9f43a51653c4d352e2adf3067254a68a33ed389 Author: Gary Yao AuthorDate: Wed May 15 13:24:02 2019 +0200 [hotfix][runtime] Move scheduling-related classes to new package Move SchedulerNG, SchedulerNGFactory, and its implementations to package org.apache.flink.runtime.scheduler. --- .../flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java | 3 +-- .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java| 2 ++ .../runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java| 2 +- .../apache/flink/runtime/{jobmaster => scheduler}/LegacyScheduler.java | 3 ++- .../flink/runtime/{jobmaster => scheduler}/LegacySchedulerFactory.java | 2 +- .../org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNG.java | 3 ++- .../flink/runtime/{jobmaster => scheduler}/SchedulerNGFactory.java | 2 +- .../test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java| 2 ++ 8 files changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java index 6962b6e..b7f80b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java @@ -25,8 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMasterConfiguration; -import org.apache.flink.runtime.jobmaster.LegacySchedulerFactory; -import org.apache.flink.runtime.jobmaster.SchedulerNGFactory; import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; @@ -36,6 +34,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; /** * Singleton default factory for {@link JobManagerRunner}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 627cc45..cc7139b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -73,6 +73,8 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java index 3f80042..c89ecbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterConfiguration; -import org.apache.flink.runtime.jobmaster.SchedulerNGFactory; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java similarity index 99% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java rename to flink-runtime/src/mai
[flink] 04/04: [FLINK-12432][runtime] Add SchedulerNG stub implementation
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ed31f4c76aa47ff9ded9c46927e4c1e97510088d Author: Gary Yao AuthorDate: Wed May 15 13:57:15 2019 +0200 [FLINK-12432][runtime] Add SchedulerNG stub implementation Add new SchedulerNG stub implementation, which will represents the future default scheduler. Add feature toggle to switch between existing scheduler and stub implementation. Add ThrowingRestartStrategy to validate that in new scheduling code paths, the legacy restart strategies are not used. This closes #8452. --- .../flink/configuration/JobManagerOptions.java | 13 .../dispatcher/DefaultJobManagerRunnerFactory.java | 3 +- .../dispatcher/SchedulerNGFactoryFactory.java | 52 +++ .../restart/ThrowingRestartStrategy.java | 53 +++ .../flink/runtime/scheduler/DefaultScheduler.java | 78 ++ .../runtime/scheduler/DefaultSchedulerFactory.java | 73 .../dispatcher/SchedulerNGFactoryFactoryTest.java | 77 + .../ThrowingRestartStrategyFactoryTest.java| 67 +++ 8 files changed, 414 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 69f445f..a1d55b1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -160,6 +160,19 @@ public class JobManagerOptions { // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue()) .withDescription("The timeout in milliseconds for a idle slot in Slot Pool."); + /** +* Config parameter determining the scheduler implementation. +*/ + @Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.") + public static final ConfigOption SCHEDULER = + key("jobmanager.scheduler") + .defaultValue("legacy") + .withDescription(Description.builder() + .text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:") + .list( + text("'legacy': legacy scheduler"), + text("'ng': new generation scheduler")) + .build()); // - diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java index b7f80b6..c0707c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java @@ -57,8 +57,7 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration); final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration); - final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory( - jobManagerServices.getRestartStrategyFactory()); + final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, jobManagerServices.getRestartStrategyFactory()); final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory( jobMasterConfiguration, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java new file mode 100644 index 000..a757bae --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file
[flink] branch master updated: [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2f39736 [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions 2f39736 is described below commit 2f397367164c8f915304f6317bf0b9dfde615ea9 Author: zhijiang AuthorDate: Fri May 24 00:09:40 2019 +0800 [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions --- .../network_environment_configuration.html | 61 ++ ...ation.html => network_netty_configuration.html} | 0 .../generated/task_manager_configuration.html | 50 - docs/ops/config.md | 8 +- .../flink/addons/hbase/HBaseConnectorITCase.java | 4 +- .../flink/configuration/ConfigConstants.java | 12 +- .../configuration/NetworkEnvironmentOptions.java | 216 + .../flink/configuration/TaskManagerOptions.java| 129 ...TaskManagerHeapSizeCalculationJavaBashTest.java | 19 +- .../configuration/ConfigOptionsDocGenerator.java | 1 - .../io/network/buffer/NetworkBufferPool.java | 14 +- .../runtime/io/network/netty/NettyConfig.java | 67 +-- .../NetworkEnvironmentConfiguration.java | 85 .../runtime/io/network/NetworkEnvironmentTest.java | 6 +- .../network/netty/NettyConnectionManagerTest.java | 7 +- .../PartialConsumePipelinedResultTest.java | 4 +- .../taskexecutor/NetworkBufferCalculationTest.java | 13 +- .../NetworkEnvironmentConfigurationTest.java | 91 - .../taskexecutor/TaskExecutorSubmissionTest.java | 11 +- .../runtime/taskexecutor/TaskExecutorTest.java | 7 +- .../taskexecutor/TaskManagerRunnerStartupTest.java | 3 +- .../TaskManagerServicesConfigurationTest.java | 18 +- .../TaskSubmissionTestEnvironment.java | 8 +- .../TaskCancelAsyncProducerConsumerITCase.java | 3 +- .../streaming/runtime/io/InputProcessorUtil.java | 3 +- .../StreamNetworkThroughputBenchmarkTest.java | 8 +- .../flink/test/cancelling/CancelingTestBase.java | 3 +- .../EventTimeWindowCheckpointingITCase.java| 3 +- .../manual/StreamingScalabilityAndLatency.java | 3 +- .../SuccessAfterNetworkBuffersFailureITCase.java | 3 +- ...tractTaskManagerProcessFailureRecoveryTest.java | 3 +- .../JobManagerHAProcessFailureRecoveryITCase.java | 3 +- .../recovery/ProcessFailureCancelingITCase.java| 3 +- .../flink/test/runtime/NettyEpollITCase.java | 4 +- .../apache/flink/yarn/YarnConfigurationITCase.java | 6 +- 35 files changed, 474 insertions(+), 405 deletions(-) diff --git a/docs/_includes/generated/network_environment_configuration.html b/docs/_includes/generated/network_environment_configuration.html new file mode 100644 index 000..7787ed0 --- /dev/null +++ b/docs/_includes/generated/network_environment_configuration.html @@ -0,0 +1,61 @@ + + + +Key +Default +Description + + + + +taskmanager.data.port +0 +The task manager’s port used for data exchange operations. + + +taskmanager.data.ssl.enabled +true +Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true + + +taskmanager.network.detailed-metrics +false +Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths. + + +taskmanager.network.memory.buffers-per-channel +2 +Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. + + + taskmanager.network.memory.floating-buffers-per-gate +8 +Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of highe [...] + + +taskmanager.network.memory.fraction +0.1 +Fraction of JVM me
[flink] 05/08: [hotfix][network, tests] Add new unit test for RemoteInputChannel#retriggerSubpartitionRequest
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9f2338cc1999cb57264451ab17d4325beda065c3 Author: Zhijiang AuthorDate: Mon May 20 19:04:45 2019 +0800 [hotfix][network,tests] Add new unit test for RemoteInputChannel#retriggerSubpartitionRequest It is necessary for flip1 to make sure the PartitionNotFoundException would be setted on the RemoteInputChannel while retriggering partition request, so a new unit test is added to cover this case. --- .../io/network/TestingConnectionManager.java | 50 ++ .../io/network/TestingPartitionRequestClient.java | 50 ++ .../partition/consumer/RemoteInputChannelTest.java | 25 +++ 3 files changed, 125 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java new file mode 100644 index 000..822314d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +/** + * A dummy implementation of the {@link ConnectionManager} which is mainly used for creating + * {@link PartitionRequestClient} instance in tests. + */ +public class TestingConnectionManager implements ConnectionManager { + + @Override + public void start() {} + + @Override + public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) { + return new TestingPartitionRequestClient(); + } + + @Override + public void closeOpenChannelConnections(ConnectionID connectionId) {} + + @Override + public int getNumberOfActiveConnections() { + return 0; + } + + @Override + public int getDataPort() { + return -1; + } + + @Override + public void shutdown() {} +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java new file mode 100644 index 000..68abf63 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +/** + * A dummy implementation of the {@link PartitionRequestClient} instance which is mainly used + * for tests to avoid mock. + */ +public class TestingPartitionRequestClient implements PartitionRequestClient { + + @Override + public void requestSubpartition( + ResultPartitionID partitionId, + int subpartitionIndex, + RemoteInputChannel inputChannel, + int delayMs) { + } + + @Override + public void notifyCreditAvailable(RemoteInputChannel inputChannel) { + } + + @Override
[flink] 07/08: [FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5e72b3419ea1bcc59c3e93c878bb49de7f0c503c Author: Zhijiang AuthorDate: Thu May 16 15:35:26 2019 +0800 [FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side The new proposed PartitionException would cover all the cases of consuming partition failure which causes consumer failed, then JM decides to restart the producer based on this exception. [FLINK-6227][network] (part 2)Make current PartitionNotFoundException extend PartitionException This closes #8242. --- ...FoundException.java => PartitionException.java} | 19 +--- .../partition/PartitionNotFoundException.java | 17 ++ .../network/partition/consumer/InputChannel.java | 4 .../partition/consumer/SingleInputGateTest.java| 26 ++ 4 files changed, 43 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java similarity index 68% copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java index 2f78816..75f1cf1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java @@ -21,24 +21,27 @@ package org.apache.flink.runtime.io.network.partition; import java.io.IOException; /** - * Exception for failed partition requests due to non-existing partitions. + * Exception for covering all the scenarios of consuming partition failure + * which causes the consumer task failed, and the job master would decide + * whether to restart the producer based on this exception. */ -public class PartitionNotFoundException extends IOException { +public abstract class PartitionException extends IOException { private static final long serialVersionUID = 0L; private final ResultPartitionID partitionId; - public PartitionNotFoundException(ResultPartitionID partitionId) { + public PartitionException(String message, ResultPartitionID partitionId) { + this(message, partitionId, null); + } + + public PartitionException(String message, ResultPartitionID partitionId, Throwable throwable) { + super(message, throwable); + this.partitionId = partitionId; } public ResultPartitionID getPartitionId() { return partitionId; } - - @Override - public String getMessage() { - return "Partition " + partitionId + " not found."; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java index 2f78816..c4e4bd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java @@ -18,27 +18,14 @@ package org.apache.flink.runtime.io.network.partition; -import java.io.IOException; - /** * Exception for failed partition requests due to non-existing partitions. */ -public class PartitionNotFoundException extends IOException { +public class PartitionNotFoundException extends PartitionException { private static final long serialVersionUID = 0L; - private final ResultPartitionID partitionId; - public PartitionNotFoundException(ResultPartitionID partitionId) { - this.partitionId = partitionId; - } - - public ResultPartitionID getPartitionId() { - return partitionId; - } - - @Override - public String getMessage() { - return "Partition " + partitionId + " not found."; + super("Partition " + partitionId + " not found.", partitionId); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index a08ecc2..c0a204b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter; imp
[flink] 01/08: [hotfix][network, tests] Add new unit tests for ResultPartitionManager#createSubpartitionView
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 222f87f35560ab4be24b11c20b4a04718968797c Author: Zhijiang AuthorDate: Wed May 15 17:29:05 2019 +0800 [hotfix][network,tests] Add new unit tests for ResultPartitionManager#createSubpartitionView It is necessary to make sure PartitionNotFoundException is thrown on producer side for flip1, so some new tests are added to cover the cases of creating subpartition view via ResultPartitionManager for registered/unregistered partitions. --- .../partition/ResultPartitionManagerTest.java | 63 ++ 1 file changed, 63 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java new file mode 100644 index 000..99fd5c9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Matchers; +import org.junit.Test; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for {@link ResultPartitionManager}. + */ +public class ResultPartitionManagerTest extends TestLogger { + + /** +* Tests that {@link ResultPartitionManager#createSubpartitionView(ResultPartitionID, int, BufferAvailabilityListener)} +* would throw {@link PartitionNotFoundException} if this partition was not registered before. +*/ + @Test + public void testThrowPartitionNotFoundException() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = PartitionTestUtils.createPartition(); + try { + partitionManager.createSubpartitionView(partition.getPartitionId(), 0, new NoOpBufferAvailablityListener()); + + fail("Should throw PartitionNotFoundException for unregistered partition."); + } catch (PartitionNotFoundException notFound) { + assertThat(partition.getPartitionId(), Matchers.is(notFound.getPartitionId())); + } + } + + /** +* Tests {@link ResultPartitionManager#createSubpartitionView(ResultPartitionID, int, BufferAvailabilityListener)} +* successful if this partition was already registered before. +*/ + @Test + public void testCreateViewForRegisteredPartition() throws Exception { + final ResultPartitionManager partitionManager = new ResultPartitionManager(); + final ResultPartition partition = PartitionTestUtils.createPartition(); + + partitionManager.registerResultPartition(partition); + partitionManager.createSubpartitionView(partition.getPartitionId(), 0, new NoOpBufferAvailablityListener()); + } +}
[flink] 04/08: [hotfix][network] Introduce PartititonRequestClient interface for creating simple client instance in tests
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0189db46316eff35d3d07f57ebcf116b1f61cea6 Author: Zhijiang AuthorDate: Mon May 20 18:55:59 2019 +0800 [hotfix][network] Introduce PartititonRequestClient interface for creating simple client instance in tests --- .../runtime/io/network/ConnectionManager.java | 2 - .../runtime/io/network/LocalConnectionManager.java | 2 - .../runtime/io/network/PartitionRequestClient.java | 70 ++ .../io/network/netty/NettyConnectionManager.java | 1 + ...lient.java => NettyPartitionRequestClient.java} | 22 +++ .../netty/PartitionRequestClientFactory.java | 21 +++ .../partition/consumer/RemoteInputChannel.java | 2 +- .../netty/ClientTransportErrorHandlingTest.java| 19 +++--- ...editBasedPartitionRequestClientHandlerTest.java | 5 +- ...t.java => NettyPartitionRequestClientTest.java} | 9 +-- .../network/partition/InputChannelTestUtils.java | 2 +- .../partition/consumer/RemoteInputChannelTest.java | 2 +- 12 files changed, 113 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index 75f39e9..c342750 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; - import java.io.IOException; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java index 46ca7fc..319a9ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; - /** * A connection manager implementation to bypass setup overhead for task managers running in local * execution mode. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java new file mode 100644 index 000..a215700 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import java.io.IOException; + +/** + * Client to send messages or task events via network for {@link RemoteInputChannel}. + */ +public interface PartitionRequestClient { + + /** +* Requests a remote sub partition. +* +* @param partitionId The identifier of result partition to be requested. +* @param subpartitionIndex The sub partition index in the requested result partition. +* @param inputChannel The remote input channel for requesting the sub partition. +* @param delayMs The request is scheduled within a delay time. +*/ + void requestSubpartition( + ResultPartitionID partitionId, + int subpartitionIndex, + RemoteInputChannel inputChannel, + int delayMs) throws IOException; + + /** +* Notifies available credits from one remote input channel. +* +* @param inputChannel The remote input channel who announces the available credits. +*/ + void notifyCreditAvailab
[flink] 02/08: [hotfix][network, tests] Add new unit tests for PartitionRequestServerHandler
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f78e494cb4710c916e3e7bee32c792d42f243bb2 Author: Zhijiang AuthorDate: Wed May 15 17:36:32 2019 +0800 [hotfix][network,tests] Add new unit tests for PartitionRequestServerHandler It is necessary to make sure the network server would not transform or swallow the PartitionNotFoundException thrown by ResultPartitionManager, so a new unit test is added for PartitionReuqestServerHandler to cover this case. --- .../netty/PartitionRequestServerHandlerTest.java | 71 ++ 1 file changed, 71 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java new file mode 100644 index 000..23f2254 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; + +import org.junit.Test; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link PartitionRequestServerHandler}. + */ +public class PartitionRequestServerHandlerTest extends TestLogger { + + /** +* Tests that {@link PartitionRequestServerHandler} responds {@link ErrorResponse} with wrapped +* {@link PartitionNotFoundException} after receiving invalid {@link PartitionRequest}. +*/ + @Test + public void testResponsePartitionNotFoundException() { + final PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( + new ResultPartitionManager(), + new TaskEventDispatcher(), + new PartitionRequestQueue(), + true); + final EmbeddedChannel channel = new EmbeddedChannel(serverHandler); + final ResultPartitionID partitionId = new ResultPartitionID(); + + // Write the message of partition request to server + channel.writeInbound(new PartitionRequest(partitionId, 0, new InputChannelID(), 2)); + channel.runPendingTasks(); + + // Read the response message after handling partition request + final Object msg = channel.readOutbound(); + assertThat(msg, instanceOf(ErrorResponse.class)); + + final ErrorResponse err = (ErrorResponse) msg; + assertThat(err.cause, instanceOf(PartitionNotFoundException.class)); + + final ResultPartitionID actualPartitionId = ((PartitionNotFoundException) err.cause).getPartitionId(); + assertThat(partitionId, is(actualPartitionId)); + } +}
[flink] branch master updated (2f39736 -> 49e1832)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2f39736 [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions new 222f87f [hotfix][network,tests] Add new unit tests for ResultPartitionManager#createSubpartitionView new f78e494 [hotfix][network,tests] Add new unit tests for PartitionRequestServerHandler new 22f87ee [hotfix][network,tests] Add new unit test for LocalInputChannel#requestSubpartition new 0189db4 [hotfix][network] Introduce PartititonRequestClient interface for creating simple client instance in tests new 9f2338c [hotfix][network,tests] Add new unit test for RemoteInputChannel#retriggerSubpartitionRequest new 6d0b6b9 [hotfix][coordination] Refactor PartitionException to PartitionUpdateException new 5e72b34 [FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side new 49e1832 [FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer The 8 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/io/network/ConnectionManager.java | 2 - .../runtime/io/network/LocalConnectionManager.java | 2 - .../runtime/io/network/PartitionRequestClient.java | 70 + .../io/network/netty/NettyConnectionManager.java | 1 + ...lient.java => NettyPartitionRequestClient.java} | 22 --- .../netty/PartitionRequestClientFactory.java | 21 --- ...FoundException.java => PartitionException.java} | 19 +++--- .../partition/PartitionNotFoundException.java | 17 + .../network/partition/consumer/InputChannel.java | 4 ++ .../consumer/PartitionConnectionException.java}| 18 +++--- .../partition/consumer/RemoteInputChannel.java | 10 ++- .../partition/consumer/SingleInputGate.java| 5 ++ .../flink/runtime/taskexecutor/TaskExecutor.java | 4 +- ...xception.java => PartitionUpdateException.java} | 8 +-- .../io/network/TestingConnectionManager.java} | 15 +++-- .../io/network/TestingPartitionRequestClient.java} | 35 +++ .../netty/ClientTransportErrorHandlingTest.java| 19 +++--- ...editBasedPartitionRequestClientHandlerTest.java | 5 +- ...t.java => NettyPartitionRequestClientTest.java} | 9 +-- .../netty/PartitionRequestServerHandlerTest.java | 71 + .../network/partition/InputChannelTestUtils.java | 2 +- .../partition/ResultPartitionManagerTest.java | 63 +++ .../partition/consumer/LocalInputChannelTest.java | 72 ++ .../partition/consumer/RemoteInputChannelTest.java | 51 ++- .../partition/consumer/SingleInputGateTest.java| 26 .../taskexecutor/TaskExecutorSubmissionTest.java | 4 +- 26 files changed, 469 insertions(+), 106 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/{PartitionRequestClient.java => NettyPartitionRequestClient.java} (94%) copy flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/{PartitionNotFoundException.java => PartitionException.java} (68%) copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmanager/PartitionProducerDisposedException.java => io/network/partition/consumer/PartitionConnectionException.java} (62%) rename flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/{PartitionException.java => PartitionUpdateException.java} (82%) copy flink-runtime/src/{main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java => test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java} (77%) copy flink-runtime/src/{main/java/org/apache/flink/runtime/io/network/TaskEventPublisher.java => test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java} (57%) rename flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/{PartitionRequestClientTest.java => NettyPartitionRequestClientTest.java} (95%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java
[flink] 03/08: [hotfix][network, tests] Add new unit test for LocalInputChannel#requestSubpartition
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 22f87ee9761b06eeccf8c5adcbd9f4aa96803302 Author: Zhijiang AuthorDate: Thu May 16 16:53:51 2019 +0800 [hotfix][network,tests] Add new unit test for LocalInputChannel#requestSubpartition It is necessary for flip1 to make sure the PartitionNotFoundException would be thrown by LocalInputChannel#requestSubpartition if the partition was not registered in ResultPartitionManager before. So a new unit test is added to cover this case. --- .../partition/consumer/SingleInputGate.java| 5 ++ .../partition/consumer/LocalInputChannelTest.java | 72 ++ 2 files changed, 77 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 6c23698..63504bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate { } } + @VisibleForTesting + Timer getRetriggerLocalRequestTimer() { + return retriggerLocalRequestTimer; + } + @Override public void close() throws IOException { boolean released = false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 505f792..a3bc696 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.hamcrest.Matchers; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -60,6 +61,8 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -247,6 +250,75 @@ public class LocalInputChannelTest { } /** +* Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException} +* if the result partition was not registered in {@link ResultPartitionManager} and no backoff. +*/ + @Test + public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(1); + final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager()); + + try { + localChannel.requestSubpartition(0); + + fail("Should throw a PartitionNotFoundException."); + } catch (PartitionNotFoundException notFound) { + assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId())); + } + } + + /** +* Tests that {@link SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is triggered +* after {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException} +* within backoff. +*/ + @Test + public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(1); + final LocalInputChannel localChannel = createLocalInputChannel( + inputGate, new ResultPartitionManager(), 1, 1); + + inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel); + localChannel.requestSubpartition(0); + + // The timer should be initialized at the first time of retriggering partition request. + assertNotNull(inputGate.getRetriggerLocalRequestTimer()); + } + + /** +* Tests that {@link LocalInputChannel#retriggerSubparti
[flink] 08/08: [FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 49e1832056483d17f540a515cc5be7be1654dd48 Author: Zhijiang AuthorDate: Wed May 22 14:45:46 2019 +0800 [FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer If the consumer can not establish a connection to remote task executor while requesting remote subpartition, which might indicate the remote task executor is not reachable. We could wrap this connection exception into new proposed PartitionConnectionException which also extends PartitionException, then the job master would decide whether to restart the upstream region to re-producer partition data. This closes #8509. --- .../consumer/PartitionConnectionException.java}| 35 +++--- .../partition/consumer/RemoteInputChannel.java | 8 +++-- .../io/network/TestingConnectionManager.java | 4 ++- .../partition/consumer/RemoteInputChannelTest.java | 24 +++ 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java similarity index 54% copy from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java index 822314d..4713dfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java @@ -16,35 +16,20 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network; +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.partition.PartitionException; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; /** - * A dummy implementation of the {@link ConnectionManager} which is mainly used for creating - * {@link PartitionRequestClient} instance in tests. + * Exception for failed partition requests due to connection failure + * with unreachable producer. */ -public class TestingConnectionManager implements ConnectionManager { - - @Override - public void start() {} - - @Override - public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) { - return new TestingPartitionRequestClient(); - } +public class PartitionConnectionException extends PartitionException { - @Override - public void closeOpenChannelConnections(ConnectionID connectionId) {} + private static final long serialVersionUID = 0L; - @Override - public int getNumberOfActiveConnections() { - return 0; + public PartitionConnectionException(ResultPartitionID partitionId, Throwable throwable) { + super("Connection for partition " + partitionId + " not reachable.", partitionId, throwable); } - - @Override - public int getDataPort() { - return -1; - } - - @Override - public void shutdown() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index fabc495..2d174ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -161,8 +161,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { // Create a client and request the partition - partitionRequestClient = connectionManager - .createPartitionRequestClient(connectionId); + try { + partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId); + } catch (IOException e) { + // IOExceptions indicate that we could not open a connection to the remote TaskExecutor + throw new PartitionConnectionException(partitionId, e); + } partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
[flink] 06/08: [hotfix][coordination] Refactor PartitionException to PartitionUpdateException
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6d0b6b9af462432aef17d753f2bd8c3b8e04b0a1 Author: Zhijiang AuthorDate: Fri May 17 12:07:07 2019 +0800 [hotfix][coordination] Refactor PartitionException to PartitionUpdateException The current usage of PartitionException is only for describing the RPC of update parttitions failed, so the exception rename would not have any other effects. In flip1 PartitionException is used for presenting all the cases that indicate to restart the producer while the consumer failed. --- .../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 4 ++-- .../{PartitionException.java => PartitionUpdateException.java}| 8 .../flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java| 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index b35d65e..dcfabbf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -80,7 +80,7 @@ import org.apache.flink.runtime.state.TaskLocalStateStore; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; -import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException; +import org.apache.flink.runtime.taskexecutor.exceptions.PartitionUpdateException; import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; @@ -637,7 +637,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { }); } else { return FutureUtils.completedExceptionally( - new PartitionException("No reader with ID " + intermediateResultPartitionID + + new PartitionUpdateException("No reader with ID " + intermediateResultPartitionID + " for task " + executionAttemptID + " was found.")); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionUpdateException.java similarity index 82% rename from flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionUpdateException.java index eecd0ae..fa12426 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionUpdateException.java @@ -23,19 +23,19 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor; /** * Exception indicating a problem with the result partitions on the {@link TaskExecutor} side. */ -public class PartitionException extends TaskManagerException { +public class PartitionUpdateException extends TaskManagerException { private static final long serialVersionUID = 6248696963418276618L; - public PartitionException(String message) { + public PartitionUpdateException(String message) { super(message); } - public PartitionException(String message, Throwable cause) { + public PartitionUpdateException(String message, Throwable cause) { super(message, cause); } - public PartitionException(Throwable cause) { + public PartitionUpdateException(Throwable cause) { super(cause); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java index 711256e..cde7259 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java @@ -53,7 +53,7 @@ import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatew
[flink] branch master updated: [FLINK-12236][hive] Support Hive function in HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f74bbec [FLINK-12236][hive] Support Hive function in HiveCatalog f74bbec is described below commit f74bbec1794a8a836d2ffc8f248e032610f35157 Author: bowen.li AuthorDate: Tue May 21 20:54:59 2019 -0700 [FLINK-12236][hive] Support Hive function in HiveCatalog This PR creates HiveCatalogFunction class and adds support for Hive function operations in HiveCatalog. This closes #8507. --- .../flink/table/catalog/hive/HiveCatalog.java | 27 ++-- .../table/catalog/hive/HiveCatalogFunction.java| 49 + .../catalog/hive/HiveCatalogHiveMetadataTest.java | 51 +- 3 files changed, 74 insertions(+), 53 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 22fe08b..1f0bddc 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -659,6 +659,8 @@ public class HiveCatalog implements Catalog { Function hiveFunction; if (function instanceof GenericCatalogFunction) { hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) function); + } else if (function instanceof HiveCatalogFunction) { + hiveFunction = instantiateHiveFunction(functionPath, (HiveCatalogFunction) function); } else { throw new CatalogException( String.format("Unsupported catalog function type %s", function.getClass().getName())); @@ -694,8 +696,10 @@ public class HiveCatalog implements Catalog { } Function hiveFunction; - if (existingFunction instanceof GenericCatalogFunction && newFunction instanceof GenericCatalogFunction) { - hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction); + if (newFunction instanceof GenericCatalogFunction) { + hiveFunction = instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction); + } else if (newFunction instanceof HiveCatalogFunction) { + hiveFunction = instantiateHiveFunction(functionPath, (HiveCatalogFunction) newFunction); } else { throw new CatalogException( String.format("Unsupported catalog function type %s", newFunction.getClass().getName())); @@ -758,7 +762,7 @@ public class HiveCatalog implements Catalog { Function function = client.getFunction(functionPath.getDatabaseName(), functionPath.getObjectName()); if (function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) { - // TODO: extract more properties from Hive function and add to CatalogFunction's properties + // TODO: extract more properties from Hive function and add to GenericCatalogFunction's properties Map properties = new HashMap<>(); properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE); @@ -766,7 +770,9 @@ public class HiveCatalog implements Catalog { return new GenericCatalogFunction( function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()), properties); } else { - throw new CatalogException("Hive function is not supported yet"); + // TODO: extract more properties from Hive function and add to HiveCatalogFunction's properties + + return new HiveCatalogFunction(function.getClassName()); } } catch (NoSuchObjectException e) { throw new FunctionNotExistException(catalogName, functionPath, e); @@ -803,6 +809,19 @@ public class HiveCatalog implements Catalog { ); } + private static Function instantiateHiveFunction(ObjectPath functionPath, HiveCatalogFunction function) { + return new Function( + functionPath.getObjectName(), + functionPath.getData
buildbot failure in on flink-docs-master
The Buildbot has detected a new failure on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/1480 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: BUILD FAILED: failed Build docs Sincerely, -The Buildbot
[flink] branch master updated: [FLINK-12235][hive] Support partition related operations in HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 95e1686 [FLINK-12235][hive] Support partition related operations in HiveCatalog 95e1686 is described below commit 95e16862822922972f70a710bc09b7ec31c9043b Author: Rui Li AuthorDate: Wed May 15 19:44:06 2019 +0800 [FLINK-12235][hive] Support partition related operations in HiveCatalog This PR adds support for Hive partitions in HiveCatalog. This closes #8449. --- .../flink/table/catalog/hive/HiveCatalog.java | 270 ++- .../table/catalog/hive/HiveCatalogPartition.java | 61 .../hive/HiveCatalogGenericMetadataTest.java | 108 ++ .../catalog/hive/HiveCatalogHiveMetadataTest.java | 18 + .../table/catalog/GenericInMemoryCatalog.java | 82 ++--- .../table/catalog/GenericInMemoryCatalogTest.java | 371 +--- .../org/apache/flink/table/catalog/Catalog.java| 2 +- .../flink/table/catalog/CatalogTestBase.java | 379 + .../flink/table/catalog/CatalogTestUtil.java | 4 - 9 files changed, 859 insertions(+), 436 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 1f0bddc..d2387f0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.GenericCatalogDatabase; import org.apache.flink.table.catalog.GenericCatalogFunction; +import org.apache.flink.table.catalog.GenericCatalogPartition; import org.apache.flink.table.catalog.GenericCatalogTable; import org.apache.flink.table.catalog.GenericCatalogView; import org.apache.flink.table.catalog.GenericInMemoryCatalog; @@ -53,6 +54,7 @@ import org.apache.flink.util.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -63,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -500,7 +503,7 @@ public class HiveCatalog implements Catalog { // Partition keys List partitionKeys = new ArrayList<>(); if (!hiveTable.getPartitionKeys().isEmpty()) { - partitionKeys = hiveTable.getPartitionKeys().stream().map(fs -> fs.getName()).collect(Collectors.toList()); + partitionKeys = getFieldNames(hiveTable.getPartitionKeys()); } if (isView) { @@ -608,44 +611,285 @@ public class HiveCatalog implements Catalog { // -- partitions -- @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + + try { + return getHivePartition(tablePath, partitionSpec) != null; + } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e); + } + } + + @Override public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsExcept
[flink] branch master updated: [FLINK-12592][python] Add `--force` for python install.
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 873bb4d [FLINK-12592][python] Add `--force` for python install. 873bb4d is described below commit 873bb4d6388194267656f40528fea8d2c6f1d450 Author: sunjincheng121 AuthorDate: Fri May 24 12:33:27 2019 +0800 [FLINK-12592][python] Add `--force` for python install. This closes #8525 --- flink-python/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-python/tox.ini b/flink-python/tox.ini index 23d250a..b8d079f 100644 --- a/flink-python/tox.ini +++ b/flink-python/tox.ini @@ -28,7 +28,7 @@ deps = pytest commands = python --version -python setup.py install +python setup.py install --force pytest [flake8]