[flink] 06/06: [FLINK-12254][table-common] Add a converter between old type information behavior and data type

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 185ed0e2350b132bbec20b54e6cae6fe72710eae
Author: Timo Walther 
AuthorDate: Tue May 21 15:45:47 2019 +0200

[FLINK-12254][table-common] Add a converter between old type information 
behavior and data type
---
 .../types/logical/LegacyTypeInformationType.java   | 120 +++
 .../utils/LegacyTypeInfoDataTypeConverter.java | 352 +
 .../flink/table/types/utils/TypeConversions.java   |  85 +
 .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 +
 4 files changed, 704 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
new file mode 100644
index 000..a6d6a20
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This type is a temporary solution to fully support the old type system 
stack through the new
+ * stack. Many types can be mapped directly to the new type system, however, 
some types such as
+ * {@code DECIMAL}, POJOs, or case classes need special handling.
+ *
+ * This type differs from {@link TypeInformationAnyType}. This type is 
allowed to travel through
+ * the stack whereas {@link TypeInformationAnyType} should be resolved eagerly 
to {@link AnyType} by
+ * the planner.
+ *
+ * This class can be removed once we have removed all deprecated methods 
that take or return
+ * {@link TypeInformation}.
+ *
+ * @see LegacyTypeInfoDataTypeConverter
+ */
+@Internal
+public final class LegacyTypeInformationType extends LogicalType {
+
+   private static final String FORMAT = "LEGACY(%s)";
+
+   private final TypeInformation typeInfo;
+
+   public LegacyTypeInformationType(LogicalTypeRoot logicalTypeRoot, 
TypeInformation typeInfo) {
+   super(true, logicalTypeRoot);
+   this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information must not be null.");
+   }
+
+   public TypeInformation getTypeInformation() {
+   return typeInfo;
+   }
+
+   @Override
+   public LogicalType copy(boolean isNullable) {
+   return new LegacyTypeInformationType<>(getTypeRoot(), typeInfo);
+   }
+
+   @Override
+   public String asSerializableString() {
+   throw new TableException("Legacy type information has no 
serializable string representation.");
+   }
+
+   @Override
+   public String asSummaryString() {
+   return withNullability(FORMAT, typeInfo);
+   }
+
+   @Override
+   public boolean supportsInputConversion(Class clazz) {
+   return typeInfo.getTypeClass().isAssignableFrom(clazz);
+   }
+
+   @Override
+   public boolean supportsOutputConversion(Class clazz) {
+   return clazz.isAssignableFrom(typeInfo.getTypeClass());
+   }
+
+   @Override
+   public Class getDefaultConversion() {
+   return typeInfo.getTypeClass();
+   }
+
+   @Override
+   public List getChildren() {
+   return Collections.emptyList();
+   }
+
+   @Override
+   public  R accept(LogicalTypeVisitor visitor) {
+   return visitor.visit(this);
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getCla

[flink] branch master updated (7a3f081 -> 185ed0e)

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7a3f081  [FLINK-12411][table-planner][tests] Workaround limited 
support of not nullable fields in window aggregation
 new 36fef44  [hotfix][table-common] Add default precision temporal data 
types
 new 1ec14eb  [hotfix][table-common] Add assumption about expressions and 
data types
 new a71c200  [hotfix][table-common] Fix equality of data types with same 
conversion class
 new d7d2442  [hotfix][table-common] Add logical type check utilities
 new 3b13b60  [hotfix][table-common] Fix invalid class to data type 
conversion
 new 185ed0e  [FLINK-12254][table-common] Add a converter between old type 
information behavior and data type

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/table/api/DataTypes.java |  83 +
 .../apache/flink/table/expressions/Expression.java |   6 +-
 .../flink/table/types/CollectionDataType.java  |  26 +-
 .../org/apache/flink/table/types/DataType.java |  20 +-
 .../types/logical/LegacyTypeInformationType.java   | 120 +++
 .../types/logical/utils/LogicalTypeChecks.java |  87 +
 .../table/types/utils/ClassDataTypeConverter.java  |   2 +-
 .../utils/LegacyTypeInfoDataTypeConverter.java | 352 +
 .../flink/table/types/utils/TypeConversions.java   |  85 +
 .../org/apache/flink/table/types/DataTypeTest.java |   6 +
 .../apache/flink/table/types/DataTypesTest.java|  12 +
 .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 +
 12 files changed, 926 insertions(+), 20 deletions(-)
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
 create mode 100644 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java



[flink] 02/06: [hotfix][table-common] Add assumption about expressions and data types

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ec14eb44db7b6ea28f560eb69c6d016890bdb70
Author: Timo Walther 
AuthorDate: Tue May 21 12:23:20 2019 +0200

[hotfix][table-common] Add assumption about expressions and data types
---
 .../main/java/org/apache/flink/table/expressions/Expression.java| 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
index 6b68dce..57c73d4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
 
 import java.util.List;
 
@@ -26,8 +27,11 @@ import java.util.List;
  * The interface for all expressions.
  *
  * Expressions represent a logical tree for producing a computation result. 
Every expression
- * consists of zero or more sub-expressions. Expressions might be literal 
values, function calls,
+ * consists of zero, one, or more sub-expressions. Expressions might be 
literal values, function calls,
  * or field references.
+ *
+ * Expressions are part of the API. Thus, values and return types are 
expressed as instances of
+ * {@link DataType}.
  */
 @PublicEvolving
 public interface Expression {



[flink] 01/06: [hotfix][table-common] Add default precision temporal data types

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36fef4457a7f1de47989c8a2485581bcf8633b32
Author: Timo Walther 
AuthorDate: Tue May 21 12:19:04 2019 +0200

[hotfix][table-common] Add default precision temporal data types
---
 .../java/org/apache/flink/table/api/DataTypes.java | 83 ++
 .../apache/flink/table/types/DataTypesTest.java| 12 
 2 files changed, 95 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 2591227..dc7c319 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -240,6 +240,7 @@ public final class DataTypes {
 * Compared to the SQL standard, leap seconds (23:59:60 and 
23:59:61) are not supported as the
 * semantics are closer to {@link java.time.LocalTime}. A time WITH 
time zone is not provided.
 *
+* @see #TIME()
 * @see TimeType
 */
public static DataType TIME(int precision) {
@@ -247,6 +248,22 @@ public final class DataTypes {
}
 
/**
+* Data type of a time WITHOUT time zone {@code TIME} with no 
fractional seconds by default.
+*
+* An instance consists of {@code hour:minute:second} with up to 
second precision
+* and values ranging from {@code 00:00:00} to {@code 23:59:59}.
+*
+* Compared to the SQL standard, leap seconds (23:59:60 and 
23:59:61) are not supported as the
+* semantics are closer to {@link java.time.LocalTime}. A time WITH 
time zone is not provided.
+*
+* @see #TIME(int)
+* @see TimeType
+*/
+   public static DataType TIME() {
+   return new AtomicDataType(new TimeType());
+   }
+
+   /**
 * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP(p)} 
where {@code p} is the number
 * of digits of fractional seconds (=precision). {@code p} must have a 
value between 0 and 9 (both
 * inclusive).
@@ -267,6 +284,26 @@ public final class DataTypes {
}
 
/**
+* Data type of a timestamp WITHOUT time zone {@code TIMESTAMP} with 6 
digits of fractional seconds
+* by default.
+*
+* An instance consists of {@code year-month-day 
hour:minute:second[.fractional]} with up to
+* microsecond precision and values ranging from {@code -01-01 
00:00:00.00} to
+* {@code -12-31 23:59:59.99}.
+*
+* Compared to the SQL standard, leap seconds (23:59:60 and 
23:59:61) are not supported as the
+* semantics are closer to {@link java.time.LocalDateTime}.
+*
+* @see #TIMESTAMP(int)
+* @see #TIMESTAMP_WITH_TIME_ZONE(int)
+* @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+* @see TimestampType
+*/
+   public static DataType TIMESTAMP() {
+   return new AtomicDataType(new TimestampType());
+   }
+
+   /**
 * Data type of a timestamp WITH time zone {@code TIMESTAMP(p) WITH 
TIME ZONE} where {@code p} is
 * the number of digits of fractional seconds (=precision). {@code p} 
must have a value between 0
 * and 9 (both inclusive).
@@ -287,6 +324,26 @@ public final class DataTypes {
}
 
/**
+* Data type of a timestamp WITH time zone {@code TIMESTAMP WITH TIME 
ZONE} with 6 digits of fractional
+* seconds by default.
+*
+* An instance consists of {@code year-month-day 
hour:minute:second[.fractional] zone} with up
+* to microsecond precision and values ranging from {@code -01-01 
00:00:00.00 +14:59} to
+* {@code -12-31 23:59:59.99 -14:59}.
+*
+* Compared to the SQL standard, leap seconds (23:59:60 and 
23:59:61) are not supported as the
+* semantics are closer to {@link java.time.OffsetDateTime}.
+*
+* @see #TIMESTAMP_WITH_TIME_ZONE(int)
+* @see #TIMESTAMP(int)
+* @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+* @see ZonedTimestampType
+*/
+   public static DataType TIMESTAMP_WITH_TIME_ZONE() {
+   return new AtomicDataType(new ZonedTimestampType());
+   }
+
+   /**
 * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP(p) 
WITH LOCAL TIME ZONE} where
 * {@code p} is the number of digits of fractional seconds 
(=precision). {@code p} must have a value
 * between 0 and 9 (both inclusive).
@@ -313,6 +370,32 @@ public final class DataTypes {
}
 
/**
+* Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP WITH 
LOCAL TIM

[flink] 03/06: [hotfix][table-common] Fix equality of data types with same conversion class

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a71c200b470ffe7ae614870d5855d1e24673c08b
Author: Timo Walther 
AuthorDate: Tue May 21 13:35:46 2019 +0200

[hotfix][table-common] Fix equality of data types with same conversion class
---
 .../flink/table/types/CollectionDataType.java  | 26 +-
 .../org/apache/flink/table/types/DataType.java | 20 +++--
 .../org/apache/flink/table/types/DataTypeTest.java |  6 +
 3 files changed, 34 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 17b7096..b50fea9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -43,7 +43,7 @@ public final class CollectionDataType extends DataType {
LogicalType logicalType,
@Nullable Class conversionClass,
DataType elementDataType) {
-   super(logicalType, conversionClass);
+   super(logicalType, ensureArrayConversionClass(logicalType, 
elementDataType, conversionClass));
this.elementDataType = 
Preconditions.checkNotNull(elementDataType, "Element data type must not be 
null.");
}
 
@@ -82,16 +82,6 @@ public final class CollectionDataType extends DataType {
}
 
@Override
-   public Class getConversionClass() {
-   // arrays are a special case because their default conversion 
class depends on the
-   // conversion class of the element type
-   if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && 
conversionClass == null) {
-   return 
Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
-   }
-   return super.getConversionClass();
-   }
-
-   @Override
public  R accept(DataTypeVisitor visitor) {
return visitor.visit(this);
}
@@ -115,4 +105,18 @@ public final class CollectionDataType extends DataType {
public int hashCode() {
return Objects.hash(super.hashCode(), elementDataType);
}
+
+   // 

+
+   private static Class ensureArrayConversionClass(
+   LogicalType logicalType,
+   DataType elementDataType,
+   @Nullable Class clazz) {
+   // arrays are a special case because their default conversion 
class depends on the
+   // conversion class of the element type
+   if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz 
== null) {
+   return 
Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+   }
+   return clazz;
+   }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
index 303a052..6b783e44 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -52,13 +52,15 @@ import java.util.Objects;
 @PublicEvolving
 public abstract class DataType implements Serializable {
 
-   protected LogicalType logicalType;
+   protected final LogicalType logicalType;
 
-   protected @Nullable Class conversionClass;
+   protected final Class conversionClass;
 
DataType(LogicalType logicalType, @Nullable Class conversionClass) {
this.logicalType = Preconditions.checkNotNull(logicalType, 
"Logical type must not be null.");
-   this.conversionClass = performEarlyClassValidation(logicalType, 
conversionClass);
+   this.conversionClass = performEarlyClassValidation(
+   logicalType,
+   ensureConversionClass(logicalType, conversionClass));
}
 
/**
@@ -79,9 +81,6 @@ public abstract class DataType implements Serializable {
 * @return the expected conversion class
 */
public Class getConversionClass() {
-   if (conversionClass == null) {
-   return logicalType.getDefaultConversion();
-   }
return conversionClass;
}
 
@@ -133,7 +132,7 @@ public abstract class DataType implements Serializable {
}
DataType dataType = (DataType) o;

[flink] 04/06: [hotfix][table-common] Add logical type check utilities

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d7d24425e47408eb094b7f5aa6ecfcffd7f89db7
Author: Timo Walther 
AuthorDate: Tue May 21 14:42:02 2019 +0200

[hotfix][table-common] Add logical type check utilities
---
 .../types/logical/utils/LogicalTypeChecks.java | 87 ++
 1 file changed, 87 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
new file mode 100644
index 000..3958922
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+/**
+ * Utilities for checking {@link LogicalType}.
+ */
+@Internal
+public final class LogicalTypeChecks {
+
+   private static final TimeAttributeChecker TIME_ATTRIBUTE_CHECKER = new 
TimeAttributeChecker();
+
+   public static boolean hasRoot(LogicalType logicalType, LogicalTypeRoot 
typeRoot) {
+   return logicalType.getTypeRoot() == typeRoot;
+   }
+
+   public static boolean hasFamily(LogicalType logicalType, 
LogicalTypeFamily family) {
+   return logicalType.getTypeRoot().getFamilies().contains(family);
+   }
+
+   public static boolean isTimeAttribute(LogicalType logicalType) {
+   return logicalType.accept(TIME_ATTRIBUTE_CHECKER) != 
TimestampKind.REGULAR;
+   }
+
+   public static boolean isRowtimeAttribute(LogicalType logicalType) {
+   return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == 
TimestampKind.ROWTIME;
+   }
+
+   public static boolean isProctimeAttribute(LogicalType logicalType) {
+   return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == 
TimestampKind.PROCTIME;
+   }
+
+   private LogicalTypeChecks() {
+   // no instantiation
+   }
+
+   // 

+
+   private static class TimeAttributeChecker extends 
LogicalTypeDefaultVisitor {
+
+   @Override
+   public TimestampKind visit(TimestampType timestampType) {
+   return timestampType.getKind();
+   }
+
+   @Override
+   public TimestampKind visit(ZonedTimestampType 
zonedTimestampType) {
+   return zonedTimestampType.getKind();
+   }
+
+   @Override
+   public TimestampKind visit(LocalZonedTimestampType 
localZonedTimestampType) {
+   return localZonedTimestampType.getKind();
+   }
+
+   @Override
+   protected TimestampKind defaultMethod(LogicalType logicalType) {
+   // we don't verify that type is actually a timestamp
+   return TimestampKind.REGULAR;
+   }
+   }
+}



[flink] 05/06: [hotfix][table-common] Fix invalid class to data type conversion

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b13b60dc127d075b378d2f2e0a02b1b738c1245
Author: Timo Walther 
AuthorDate: Tue May 21 14:44:20 2019 +0200

[hotfix][table-common] Fix invalid class to data type conversion
---
 .../java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
index 599c8b2..a71c682 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
@@ -56,7 +56,7 @@ public final class ClassDataTypeConverter {
addDefaultDataType(double.class, DataTypes.DOUBLE());
addDefaultDataType(java.sql.Date.class, DataTypes.DATE());
addDefaultDataType(java.time.LocalDate.class, DataTypes.DATE());
-   addDefaultDataType(java.sql.Time.class, DataTypes.TIME(9));
+   addDefaultDataType(java.sql.Time.class, DataTypes.TIME(3));
addDefaultDataType(java.time.LocalTime.class, 
DataTypes.TIME(9));
addDefaultDataType(java.sql.Timestamp.class, 
DataTypes.TIMESTAMP(9));
addDefaultDataType(java.time.LocalDateTime.class, 
DataTypes.TIMESTAMP(9));



[flink-shaded] branch master updated: [hotfix][hadoop] Us e o.a.f.shaded as patternPrefix

2019-05-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-shaded.git


The following commit(s) were added to refs/heads/master by this push:
 new 3746069  [hotfix][hadoop] Us e o.a.f.shaded as patternPrefix
3746069 is described below

commit 37460692afceacce6a6df9fe056428f873c109d9
Author: Chesnay Schepler 
AuthorDate: Thu May 23 10:56:40 2019 +0200

[hotfix][hadoop] Us
e o.a.f.shaded as patternPrefix
---
 flink-shaded-hadoop-2/pom.xml | 16 
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/flink-shaded-hadoop-2/pom.xml b/flink-shaded-hadoop-2/pom.xml
index 3cbf606..91451be 100644
--- a/flink-shaded-hadoop-2/pom.xml
+++ b/flink-shaded-hadoop-2/pom.xml
@@ -1026,35 +1026,35 @@ under the License.



org.objectweb.asm
-   
org.apache.flink.hadoop2.shaded.org.objectweb.asm
+   
org.apache.flink.shaded.hadoop2.org.objectweb.asm



org.jboss.netty
-   
org.apache.flink.hadoop2.shaded.org.jboss.netty
+   
org.apache.flink.shaded.hadoop2.org.jboss.netty



io.netty
-   
org.apache.flink.hadoop2.shaded.io.netty
+   
org.apache.flink.shaded.hadoop2.io.netty



org.apache.curator
-   
org.apache.flink.hadoop2.shaded.org.apache.curator
+   
org.apache.flink.shaded.hadoop2.org.apache.curator



org.apache.http
-   
org.apache.flink.hadoop2.shaded.org.apache.http
+   
org.apache.flink.shaded.hadoop2.org.apache.http



org.apache.commons.httpclient
-   
org.apache.flink.hadoop2.shaded.org.apache.commons.httpclient
+   
org.apache.flink.shaded.hadoop2.org.apache.commons.httpclient



org.htrace
-   
org.apache.flink.hadoop2.shaded.org.htrace
+   
org.apache.flink.shaded.hadoop2.org.htrace



org.codehaus.jackson
-   
org.apache.flink.hadoop2.shaded.org.codehaus.jackson
+   
org.apache.flink.shaded.hadoop2.org.codehaus.jackson






[flink] branch master updated: [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase

2019-05-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 21b92ac  [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new 
codebase
21b92ac is described below

commit 21b92ac95f1824e9b1ca483fa3ffaaf77ef14d4a
Author: azagrebin 
AuthorDate: Thu May 23 11:35:48 2019 +0200

[FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
---
 .../executiongraph/ExecutionGraphRestartTest.java  | 709 ++---
 1 file changed, 331 insertions(+), 378 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1c5b650..1e6be72 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,9 +35,6 @@ import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,10 +42,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -58,23 +64,18 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
-import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.crea

[flink] branch master updated: [FLINK-12586][optimizer] Reverse stderr/stdout order

2019-05-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 396e93c  [FLINK-12586][optimizer] Reverse stderr/stdout order
396e93c is described below

commit 396e93cc4c5fd4086d715234589b3a991a2c809b
Author: liyafan82 <42827532+liyafa...@users.noreply.github.com>
AuthorDate: Thu May 23 17:57:03 2019 +0800

[FLINK-12586][optimizer] Reverse stderr/stdout order
---
 .../org/apache/flink/client/program/OptimizerPlanEnvironment.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index faacd9f..d524ad9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -104,8 +104,8 @@ public class OptimizerPlanEnvironment extends 
ExecutionEnvironment {
 
throw new ProgramInvocationException(
"The program plan could not be fetched - the 
program aborted pre-maturely."
-   + "\n\nSystem.err: " + 
(stdout.length() == 0 ? "(none)" : stdout)
-   + "\n\nSystem.out: " + 
(stderr.length() == 0 ? "(none)" : stderr));
+   + "\n\nSystem.err: " + 
(stderr.length() == 0 ? "(none)" : stderr)
+   + "\n\nSystem.out: " + 
(stdout.length() == 0 ? "(none)" : stdout));
}
// 

 



[flink] branch master updated: [hotfix][docs] Correct documentation of ExEnv#fromElements

2019-05-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9fde18c  [hotfix][docs] Correct documentation of ExEnv#fromElements
9fde18c is described below

commit 9fde18cc4c5bbaac8f992121fb709e9cbb97b6b8
Author: Robert Stoll 
AuthorDate: Thu May 23 12:04:21 2019 +0200

[hotfix][docs] Correct documentation of ExEnv#fromElements
---
 docs/dev/batch/index.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index 7d18bac..cb65f0b 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -829,7 +829,7 @@ File-based:
 
 Collection-based:
 
-- `fromCollection(Collection)` - Creates a data set from the Java 
Java.util.Collection. All elements
+- `fromCollection(Collection)` - Creates a data set from a 
Java.util.Collection. All elements
   in the collection must be of the same type.
 
 - `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The 
class specifies the
@@ -970,8 +970,8 @@ File-based:
 
 Collection-based:
 
-- `fromCollection(Seq)` - Creates a data set from a Seq. All elements
-  in the collection must be of the same type.
+- `fromCollection(Iterable)` - Creates a data set from an Iterable. All 
elements
+  returned by the Iterable must be of the same type.
 
 - `fromCollection(Iterator)` - Creates a data set from an Iterator. The class 
specifies the
   data type of the elements returned by the iterator.



[flink] branch master updated: [FLINK-12497][network] Move ConnectionManager#start arguments to constructor

2019-05-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 0cb56a4  [FLINK-12497][network] Move ConnectionManager#start arguments 
to constructor
0cb56a4 is described below

commit 0cb56a4dfa9bade080730312ab94b6901c7fb91a
Author: zhijiang 
AuthorDate: Thu May 23 18:38:14 2019 +0800

[FLINK-12497][network] Move ConnectionManager#start arguments to constructor
---
 .../flink/runtime/io/network/ConnectionManager.java |  3 +--
 .../runtime/io/network/LocalConnectionManager.java  |  3 +--
 .../runtime/io/network/NetworkEnvironment.java  |  9 ++---
 .../io/network/netty/NettyConnectionManager.java| 21 +
 .../network/netty/NettyConnectionManagerTest.java   | 20 
 .../io/network/partition/InputChannelTestUtils.java |  3 +--
 6 files changed, 30 insertions(+), 29 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index a84cd8a..75f39e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
 
@@ -29,7 +28,7 @@ import java.io.IOException;
  */
 public interface ConnectionManager {
 
-   void start(ResultPartitionProvider partitionProvider, 
TaskEventPublisher taskEventDispatcher) throws IOException;
+   void start() throws IOException;
 
/**
 * Creates a {@link PartitionRequestClient} instance for the given 
{@link ConnectionID}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 11cab6b..46ca7fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 /**
  * A connection manager implementation to bypass setup overhead for task 
managers running in local
@@ -28,7 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 public class LocalConnectionManager implements ConnectionManager {
 
@Override
-   public void start(ResultPartitionProvider partitionProvider, 
TaskEventPublisher taskEventPublisher) {
+   public void start() {
}
 
@Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 1f2ee7e..7ee2a20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -120,8 +120,12 @@ public class NetworkEnvironment {
checkNotNull(config);
 
NettyConfig nettyConfig = config.nettyConfig();
+
+   ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+
ConnectionManager connectionManager = nettyConfig != null ?
-   new NettyConnectionManager(nettyConfig, 
config.isCreditBased()) : new LocalConnectionManager();
+   new NettyConnectionManager(resultPartitionManager, 
taskEventPublisher, nettyConfig, config.isCreditBased()) :
+   new LocalConnectionManager();
 
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
config.numNetworkBuffers(),
@@ -130,7 +134,6 @@ public class NetworkEnvironment {
 
registerNetworkMetrics(metricGroup, networkBufferPool);
 
-   ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
resultPartitionManager,
ioManager,
@@ -280,7 +283,7 @@ public class NetworkEnvironment {
 
try {
LOG.debug("Starting network connection 
manager");
-   connectionManager.start(resultPartition

[flink] branch master updated: [hotfix] fix typos. 1. [flink-core] below -> above in TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in LocalStreamEnvironment methods

2019-05-23 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new efec532  [hotfix] fix typos. 1. [flink-core] below -> above in 
TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in 
LocalStreamEnvironment methods
efec532 is described below

commit efec53212419b4558ecd6cdf822a9da92e9d2bc2
Author: zhangxin516 
AuthorDate: Thu May 16 17:45:24 2019 +0800

[hotfix] fix typos.
1. [flink-core] below -> above in TypeInformation methods
2. [flink-streaming-java] CLusterUtil -> ClusterUtil in 
LocalStreamEnvironment methods
---
 .../main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java | 2 +-
 .../apache/flink/streaming/api/environment/LocalStreamEnvironment.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 2a5f81a..565ac74 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -108,7 +108,7 @@ public abstract class TypeInformation implements 
Serializable {
 
/**
 * Gets the number of logical fields in this type. This includes its 
nested and transitively nested
-* fields, in the case of composite types. In the example below, the 
OuterType type has three
+* fields, in the case of composite types. In the example above, the 
OuterType type has three
 * fields in total.
 *
 * The total number of fields must be at least 1.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 43f1464..eca802f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -76,7 +76,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
}
 
/**
-* Executes the JobGraph of the on a mini cluster of CLusterUtil with a 
user
+* Executes the JobGraph of the on a mini cluster of ClusterUtil with a 
user
 * specified name.
 *
 * @param jobName



[flink] 01/02: [FLINK-12254][table] Update TableSchema to new type system

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98f3046fae76405b3fcf2b5ef82835a81624498e
Author: Timo Walther 
AuthorDate: Tue May 21 16:45:27 2019 +0200

[FLINK-12254][table] Update TableSchema to new type system
---
 .../flink/table/client/cli/CliResultView.java  |   2 +-
 .../apache/flink/table/client/cli/CliUtils.java|   4 +-
 .../org/apache/flink/table/api/TableSchema.java| 140 +++--
 .../apache/flink/table/api/TableSchemaTest.scala   |   8 +-
 .../api/validation/TableSchemaValidationTest.scala |   6 +-
 5 files changed, 113 insertions(+), 47 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index 67ce5eb..407a04b 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -145,7 +145,7 @@ public abstract class CliResultView> 
extends CliView
final CliRowView view = new CliRowView(
client,
resultDescriptor.getResultSchema().getFieldNames(),
-   
CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldTypes()),
+   
CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldDataTypes()),
getRow(results.get(selectedRow)));
view.open(); // enter view
}
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
index 77894e8..471d168 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.client.cli;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.jline.utils.AttributedString;
@@ -103,7 +103,7 @@ public final class CliUtils {
return fields;
}
 
-   public static String[] typesToString(TypeInformation[] types) {
+   public static String[] typesToString(DataType[] types) {
final String[] typesAsString = new String[types.length];
for (int i = 0; i < types.length; i++) {
typesAsString[i] = types[i].toString();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
index 385f9a0..c6804f9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
@@ -20,8 +20,9 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -33,10 +34,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.Field;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
-  * A table schema that represents a table's structure with field names and 
types.
-  */
+ * A table schema that represents a table's structure with field names and 
data types.
+ */
 @PublicEvolving
 public class TableSchema {
 
@@ -44,20 +52,20 @@ public class TableSchema {
 
private final String[] fieldNames;
 
-   private final TypeInformation[] fieldTypes;
+   private final DataType[] fieldDataTypes;
 
private final Map fieldNameToIndex;
 
-   public TableSchema(String[] fieldNames, TypeInformation[] 
fieldTypes) {
+   private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) {
this.fieldNames = Preconditions.checkNotNull(field

[flink] 02/02: [hotfix][connector-hive] Fix Hive type mapping to Table API type information

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f19345767e930f4a9fcae89313de79688949f90
Author: Timo Walther 
AuthorDate: Wed May 22 11:12:23 2019 +0200

[hotfix][connector-hive] Fix Hive type mapping to Table API type information
---
 .../flink/table/catalog/hive/util/HiveTypeUtil.java | 17 ++---
 .../catalog/hive/HiveCatalogGenericMetadataTest.java|  6 +++---
 2 files changed, 9 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index d26d20b..5a94415 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive.util;
 
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
@@ -64,24 +65,18 @@ public class HiveTypeUtil {
return serdeConstants.DOUBLE_TYPE_NAME;
} else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
return serdeConstants.STRING_TYPE_NAME;
-   } else if (type == BasicTypeInfo.DATE_TYPE_INFO) {
+   } else if (type == SqlTimeTypeInfo.DATE) {
return serdeConstants.DATE_TYPE_NAME;
-   } else if (type == BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO) {
+   } else if (type == 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO) {
return serdeConstants.BINARY_TYPE_NAME;
-   } else if (type instanceof SqlTimeTypeInfo) {
+   } else if (type == SqlTimeTypeInfo.TIMESTAMP) {
return serdeConstants.TIMESTAMP_TYPE_NAME;
-   } else if (type instanceof BasicArrayTypeInfo) {
-   return toHiveArrayType((BasicArrayTypeInfo) type);
} else {
throw new UnsupportedOperationException(
String.format("Flink doesn't support converting 
type %s to Hive type yet.", type.toString()));
}
}
 
-   private static String toHiveArrayType(BasicArrayTypeInfo arrayTypeInfo) 
{
-   return String.format(HIVE_ARRAY_TYPE_NAME_FORMAT, 
toHiveType(arrayTypeInfo.getComponentInfo()));
-   }
-
/**
 * Convert Hive data type to a Flink data type.
 * TODO: the following Hive types are not supported in Flink yet, 
including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
@@ -127,11 +122,11 @@ public class HiveTypeUtil {
case DOUBLE:
return BasicTypeInfo.DOUBLE_TYPE_INFO;
case DATE:
-   return BasicTypeInfo.DATE_TYPE_INFO;
+   return SqlTimeTypeInfo.DATE;
case TIMESTAMP:
return SqlTimeTypeInfo.TIMESTAMP;
case BINARY:
-   return BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+   return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
default:
throw new UnsupportedOperationException(
String.format("Flink doesn't support 
Hive primitive type %s yet", hiveType));
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index ae5f5c89..9a35068 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.catalog.hive;
 
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
@@ -65,8 +65,8 @@ public c

[flink] branch master updated (efec532 -> 9f19345)

2019-05-23 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from efec532  [hotfix] fix typos. 1. [flink-core] below -> above in 
TypeInformation methods 2. [flink-streaming-java] CLusterUtil -> ClusterUtil in 
LocalStreamEnvironment methods
 new 98f3046  [FLINK-12254][table] Update TableSchema to new type system
 new 9f19345  [hotfix][connector-hive] Fix Hive type mapping to Table API 
type information

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/catalog/hive/util/HiveTypeUtil.java  |  17 +--
 .../hive/HiveCatalogGenericMetadataTest.java   |   6 +-
 .../flink/table/client/cli/CliResultView.java  |   2 +-
 .../apache/flink/table/client/cli/CliUtils.java|   4 +-
 .../org/apache/flink/table/api/TableSchema.java| 140 +++--
 .../apache/flink/table/api/TableSchemaTest.scala   |   8 +-
 .../api/validation/TableSchemaValidationTest.scala |   6 +-
 7 files changed, 122 insertions(+), 61 deletions(-)



[flink] branch master updated (9f19345 -> ed31f4c)

2019-05-23 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9f19345  [hotfix][connector-hive] Fix Hive type mapping to Table API 
type information
 new e10e9ef  [hotfix][core] Delete unused config option 
jobmanager.resourcemanager.reconnect-interval
 new aa9b899  [hotfix][runtime] Fix checkstyle violations in 
RestartStrategyFactory
 new f9f43a5  [hotfix][runtime] Move scheduling-related classes to new 
package
 new ed31f4c  [FLINK-12432][runtime] Add SchedulerNG stub implementation

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/job_manager_configuration.html   |  5 --
 .../flink/configuration/JobManagerOptions.java | 25 +++
 .../dispatcher/DefaultJobManagerRunnerFactory.java |  6 +-
 .../dispatcher/SchedulerNGFactoryFactory.java  | 52 +++
 .../restart/RestartStrategyFactory.java|  9 ++-
 .../restart/ThrowingRestartStrategy.java   | 53 +++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  2 +
 .../factories/DefaultJobMasterServiceFactory.java  |  2 +-
 .../DefaultScheduler.java} | 28 
 .../DefaultSchedulerFactory.java}  | 19 ++
 .../{jobmaster => scheduler}/LegacyScheduler.java  |  3 +-
 .../LegacySchedulerFactory.java|  2 +-
 .../{jobmaster => scheduler}/SchedulerNG.java  |  3 +-
 .../SchedulerNGFactory.java|  2 +-
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  | 77 ++
 .../ThrowingRestartStrategyFactoryTest.java| 67 +++
 .../flink/runtime/jobmaster/JobMasterTest.java |  2 +
 17 files changed, 299 insertions(+), 58 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/LegacySchedulerFactory.java
 => scheduler/DefaultScheduler.java} (78%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/LegacySchedulerFactory.java
 => scheduler/DefaultSchedulerFactory.java} (79%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => 
scheduler}/LegacyScheduler.java (99%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => 
scheduler}/LegacySchedulerFactory.java (98%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => 
scheduler}/SchedulerNG.java (98%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => 
scheduler}/SchedulerNGFactory.java (97%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java



[flink] 01/04: [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval

2019-05-23 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e10e9efd22ef76605f9c3a33707221aecccdff0a
Author: Gary Yao 
AuthorDate: Tue May 14 16:43:42 2019 +0200

[hotfix][core] Delete unused config option 
jobmanager.resourcemanager.reconnect-interval
---
 docs/_includes/generated/job_manager_configuration.html  |  5 -
 .../org/apache/flink/configuration/JobManagerOptions.java| 12 
 2 files changed, 17 deletions(-)

diff --git a/docs/_includes/generated/job_manager_configuration.html 
b/docs/_includes/generated/job_manager_configuration.html
index 177c362..73477fe 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -23,11 +23,6 @@
 JVM heap size for the JobManager.
 
 
-jobmanager.resourcemanager.reconnect-interval
-2000
-This option specifies the interval in order to trigger a 
resource manager reconnection if the connection to the resource manager has 
been lost. This option is only intended for internal use.
-
-
 jobmanager.rpc.address
 (none)
 The config parameter defining the network address to connect 
to for communication with the job manager. This value is only interpreted in 
setups where a single JobManager with static name or address exists (simple 
standalone setups, or container setups with dynamic service name resolution). 
It is not used in many high-availability setups, when a leader-election service 
(like ZooKeeper) is used to elect and discover the JobManager leader from 
potentially multiple standby J [...]
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index a91b931..69f445f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -120,18 +120,6 @@ public class JobManagerOptions {
).build());
 
/**
-* This option specifies the interval in order to trigger a resource 
manager reconnection if the connection
-* to the resource manager has been lost.
-*
-* This option is only intended for internal use.
-*/
-   public static final ConfigOption 
RESOURCE_MANAGER_RECONNECT_INTERVAL =
-   key("jobmanager.resourcemanager.reconnect-interval")
-   .defaultValue(2000L)
-   .withDescription("This option specifies the interval in order 
to trigger a resource manager reconnection if the connection" +
-   " to the resource manager has been lost. This option is 
only intended for internal use.");
-
-   /**
 * The location where the JobManager stores the archives of completed 
jobs.
 */
public static final ConfigOption ARCHIVE_DIR =



[flink] 02/04: [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory

2019-05-23 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa9b89919ef10570920fdc6b22cabdf244b70508
Author: Gary Yao 
AuthorDate: Wed May 15 11:41:38 2019 +0200

[hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory
---
 .../runtime/executiongraph/restart/RestartStrategyFactory.java   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index f15ee0b..b5361ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -22,14 +22,19 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
 
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import scala.concurrent.duration.Duration;
+
+/**
+ * Factory for {@link RestartStrategy}.
+ */
 public abstract class RestartStrategyFactory implements Serializable {
private static final long serialVersionUID = 7320252552640522191L;
 
@@ -37,7 +42,7 @@ public abstract class RestartStrategyFactory implements 
Serializable {
private static final String CREATE_METHOD = "createFactory";
 
/**
-* Factory method to create a restart strategy
+* Factory method to create a restart strategy.
 * @return The created restart strategy
 */
public abstract RestartStrategy createRestartStrategy();



[flink] 03/04: [hotfix][runtime] Move scheduling-related classes to new package

2019-05-23 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9f43a51653c4d352e2adf3067254a68a33ed389
Author: Gary Yao 
AuthorDate: Wed May 15 13:24:02 2019 +0200

[hotfix][runtime] Move scheduling-related classes to new package

Move SchedulerNG, SchedulerNGFactory, and its implementations to package
org.apache.flink.runtime.scheduler.
---
 .../flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java   | 3 +--
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java| 2 ++
 .../runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java| 2 +-
 .../apache/flink/runtime/{jobmaster => scheduler}/LegacyScheduler.java | 3 ++-
 .../flink/runtime/{jobmaster => scheduler}/LegacySchedulerFactory.java | 2 +-
 .../org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNG.java | 3 ++-
 .../flink/runtime/{jobmaster => scheduler}/SchedulerNGFactory.java | 2 +-
 .../test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java| 2 ++
 8 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index 6962b6e..b7f80b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
-import org.apache.flink.runtime.jobmaster.LegacySchedulerFactory;
-import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
@@ -36,6 +34,7 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 
 /**
  * Singleton default factory for {@link JobManagerRunner}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 627cc45..cc7139b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -73,6 +73,8 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index 3f80042..c89ecbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
-import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
similarity index 99%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
rename to 
flink-runtime/src/mai

[flink] 04/04: [FLINK-12432][runtime] Add SchedulerNG stub implementation

2019-05-23 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed31f4c76aa47ff9ded9c46927e4c1e97510088d
Author: Gary Yao 
AuthorDate: Wed May 15 13:57:15 2019 +0200

[FLINK-12432][runtime] Add SchedulerNG stub implementation

Add new SchedulerNG stub implementation, which will represents the future
default scheduler.

Add feature toggle to switch between existing scheduler and stub
implementation.

Add ThrowingRestartStrategy to validate that in new scheduling code paths, 
the
legacy restart strategies are not used.

This closes #8452.
---
 .../flink/configuration/JobManagerOptions.java | 13 
 .../dispatcher/DefaultJobManagerRunnerFactory.java |  3 +-
 .../dispatcher/SchedulerNGFactoryFactory.java  | 52 +++
 .../restart/ThrowingRestartStrategy.java   | 53 +++
 .../flink/runtime/scheduler/DefaultScheduler.java  | 78 ++
 .../runtime/scheduler/DefaultSchedulerFactory.java | 73 
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  | 77 +
 .../ThrowingRestartStrategyFactoryTest.java| 67 +++
 8 files changed, 414 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 69f445f..a1d55b1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -160,6 +160,19 @@ public class JobManagerOptions {
// default matches heartbeat.timeout so that sticky 
allocation is not lost on timeouts for local recovery

.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
.withDescription("The timeout in milliseconds for a 
idle slot in Slot Pool.");
+   /**
+* Config parameter determining the scheduler implementation.
+*/
+   @Documentation.ExcludeFromDocumentation("SchedulerNG is still in 
development.")
+   public static final ConfigOption SCHEDULER =
+   key("jobmanager.scheduler")
+   .defaultValue("legacy")
+   .withDescription(Description.builder()
+   .text("Determines which scheduler 
implementation is used to schedule tasks. Accepted values are:")
+   .list(
+   text("'legacy': legacy scheduler"),
+   text("'ng': new generation scheduler"))
+   .build());
 
// 
-
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index b7f80b6..c0707c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -57,8 +57,7 @@ public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
 
final SlotPoolFactory slotPoolFactory = 
DefaultSlotPoolFactory.fromConfiguration(configuration);
final SchedulerFactory schedulerFactory = 
DefaultSchedulerFactory.fromConfiguration(configuration);
-   final SchedulerNGFactory schedulerNGFactory = new 
LegacySchedulerFactory(
-   jobManagerServices.getRestartStrategyFactory());
+   final SchedulerNGFactory schedulerNGFactory = 
SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, 
jobManagerServices.getRestartStrategyFactory());
 
final JobMasterServiceFactory jobMasterFactory = new 
DefaultJobMasterServiceFactory(
jobMasterConfiguration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
new file mode 100644
index 000..a757bae
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file 

[flink] branch master updated: [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions

2019-05-23 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2f39736  [FLINK-12127][network] Consolidate network options in 
NetworkEnvironmentOptions
2f39736 is described below

commit 2f397367164c8f915304f6317bf0b9dfde615ea9
Author: zhijiang 
AuthorDate: Fri May 24 00:09:40 2019 +0800

[FLINK-12127][network] Consolidate network options in 
NetworkEnvironmentOptions
---
 .../network_environment_configuration.html |  61 ++
 ...ation.html => network_netty_configuration.html} |   0
 .../generated/task_manager_configuration.html  |  50 -
 docs/ops/config.md |   8 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   |   4 +-
 .../flink/configuration/ConfigConstants.java   |  12 +-
 .../configuration/NetworkEnvironmentOptions.java   | 216 +
 .../flink/configuration/TaskManagerOptions.java| 129 
 ...TaskManagerHeapSizeCalculationJavaBashTest.java |  19 +-
 .../configuration/ConfigOptionsDocGenerator.java   |   1 -
 .../io/network/buffer/NetworkBufferPool.java   |  14 +-
 .../runtime/io/network/netty/NettyConfig.java  |  67 +--
 .../NetworkEnvironmentConfiguration.java   |  85 
 .../runtime/io/network/NetworkEnvironmentTest.java |   6 +-
 .../network/netty/NettyConnectionManagerTest.java  |   7 +-
 .../PartialConsumePipelinedResultTest.java |   4 +-
 .../taskexecutor/NetworkBufferCalculationTest.java |  13 +-
 .../NetworkEnvironmentConfigurationTest.java   |  91 -
 .../taskexecutor/TaskExecutorSubmissionTest.java   |  11 +-
 .../runtime/taskexecutor/TaskExecutorTest.java |   7 +-
 .../taskexecutor/TaskManagerRunnerStartupTest.java |   3 +-
 .../TaskManagerServicesConfigurationTest.java  |  18 +-
 .../TaskSubmissionTestEnvironment.java |   8 +-
 .../TaskCancelAsyncProducerConsumerITCase.java |   3 +-
 .../streaming/runtime/io/InputProcessorUtil.java   |   3 +-
 .../StreamNetworkThroughputBenchmarkTest.java  |   8 +-
 .../flink/test/cancelling/CancelingTestBase.java   |   3 +-
 .../EventTimeWindowCheckpointingITCase.java|   3 +-
 .../manual/StreamingScalabilityAndLatency.java |   3 +-
 .../SuccessAfterNetworkBuffersFailureITCase.java   |   3 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java |   3 +-
 .../JobManagerHAProcessFailureRecoveryITCase.java  |   3 +-
 .../recovery/ProcessFailureCancelingITCase.java|   3 +-
 .../flink/test/runtime/NettyEpollITCase.java   |   4 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |   6 +-
 35 files changed, 474 insertions(+), 405 deletions(-)

diff --git a/docs/_includes/generated/network_environment_configuration.html 
b/docs/_includes/generated/network_environment_configuration.html
new file mode 100644
index 000..7787ed0
--- /dev/null
+++ b/docs/_includes/generated/network_environment_configuration.html
@@ -0,0 +1,61 @@
+
+
+
+Key
+Default
+Description
+
+
+
+
+taskmanager.data.port
+0
+The task manager’s port used for data exchange operations.
+
+
+taskmanager.data.ssl.enabled
+true
+Enable SSL support for the taskmanager data transport. This is 
applicable only when the global flag for internal SSL 
(security.ssl.internal.enabled) is set to true
+
+
+taskmanager.network.detailed-metrics
+false
+Boolean flag to enable/disable more detailed metrics about 
inbound/outbound network queue lengths.
+
+
+taskmanager.network.memory.buffers-per-channel
+2
+Maximum number of network buffers to use for each 
outgoing/incoming channel (subpartition/input channel).In credit-based flow 
control mode, this indicates how many credits are exclusive in each input 
channel. It should be configured at least 2 for good performance. 1 buffer is 
for receiving in-flight data in the subpartition and 1 buffer is for parallel 
serialization.
+
+
+
taskmanager.network.memory.floating-buffers-per-gate
+8
+Number of extra network buffers to use for each 
outgoing/incoming gate (result partition/input gate). In credit-based flow 
control mode, this indicates how many floating credits are shared among all the 
input channels. The floating buffers are distributed based on backlog 
(real-time output buffers in the subpartition) feedback, and can help relieve 
back-pressure caused by unbalanced data distribution among the subpartitions. 
This value should be increased in case of highe [...]
+
+
+taskmanager.network.memory.fraction
+0.1
+Fraction of JVM me

[flink] 05/08: [hotfix][network, tests] Add new unit test for RemoteInputChannel#retriggerSubpartitionRequest

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f2338cc1999cb57264451ab17d4325beda065c3
Author: Zhijiang 
AuthorDate: Mon May 20 19:04:45 2019 +0800

[hotfix][network,tests] Add new unit test for 
RemoteInputChannel#retriggerSubpartitionRequest

It is necessary for flip1 to make sure the PartitionNotFoundException would 
be setted on the RemoteInputChannel while retriggering partition request,
so a new unit test is added to cover this case.
---
 .../io/network/TestingConnectionManager.java   | 50 ++
 .../io/network/TestingPartitionRequestClient.java  | 50 ++
 .../partition/consumer/RemoteInputChannelTest.java | 25 +++
 3 files changed, 125 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
new file mode 100644
index 000..822314d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+/**
+ * A dummy implementation of the {@link ConnectionManager} which is mainly 
used for creating
+ * {@link PartitionRequestClient} instance in tests.
+ */
+public class TestingConnectionManager implements ConnectionManager {
+
+   @Override
+   public void start() {}
+
+   @Override
+   public PartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) {
+   return new TestingPartitionRequestClient();
+   }
+
+   @Override
+   public void closeOpenChannelConnections(ConnectionID connectionId) {}
+
+   @Override
+   public int getNumberOfActiveConnections() {
+   return 0;
+   }
+
+   @Override
+   public int getDataPort() {
+   return -1;
+   }
+
+   @Override
+   public void shutdown() {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
new file mode 100644
index 000..68abf63
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+/**
+ * A dummy implementation of the {@link PartitionRequestClient} instance which 
is mainly used
+ * for tests to avoid mock.
+ */
+public class TestingPartitionRequestClient implements PartitionRequestClient {
+
+   @Override
+   public void requestSubpartition(
+   ResultPartitionID partitionId,
+   int subpartitionIndex,
+   RemoteInputChannel inputChannel,
+   int delayMs) {
+   }
+
+   @Override
+   public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
+   }
+
+   @Override

[flink] 07/08: [FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e72b3419ea1bcc59c3e93c878bb49de7f0c503c
Author: Zhijiang 
AuthorDate: Thu May 16 15:35:26 2019 +0800

[FLINK-6227][network] Introduce PartitionException to indicate restarting 
producer on JM side

The new proposed PartitionException would cover all the cases of consuming 
partition failure which causes consumer failed, then JM decides to restart the 
producer based on this exception.

[FLINK-6227][network] (part 2)Make current PartitionNotFoundException 
extend PartitionException

This closes #8242.
---
 ...FoundException.java => PartitionException.java} | 19 +---
 .../partition/PartitionNotFoundException.java  | 17 ++
 .../network/partition/consumer/InputChannel.java   |  4 
 .../partition/consumer/SingleInputGateTest.java| 26 ++
 4 files changed, 43 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java
similarity index 68%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java
index 2f78816..75f1cf1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionException.java
@@ -21,24 +21,27 @@ package org.apache.flink.runtime.io.network.partition;
 import java.io.IOException;
 
 /**
- * Exception for failed partition requests due to non-existing partitions.
+ * Exception for covering all the scenarios of consuming partition failure
+ * which causes the consumer task failed, and the job master would decide
+ * whether to restart the producer based on this exception.
  */
-public class PartitionNotFoundException extends IOException {
+public abstract class PartitionException extends IOException {
 
private static final long serialVersionUID = 0L;
 
private final ResultPartitionID partitionId;
 
-   public PartitionNotFoundException(ResultPartitionID partitionId) {
+   public PartitionException(String message, ResultPartitionID 
partitionId) {
+   this(message, partitionId, null);
+   }
+
+   public PartitionException(String message, ResultPartitionID 
partitionId, Throwable throwable) {
+   super(message, throwable);
+
this.partitionId = partitionId;
}
 
public ResultPartitionID getPartitionId() {
return partitionId;
}
-
-   @Override
-   public String getMessage() {
-   return "Partition " + partitionId + " not found.";
-   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
index 2f78816..c4e4bd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -18,27 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import java.io.IOException;
-
 /**
  * Exception for failed partition requests due to non-existing partitions.
  */
-public class PartitionNotFoundException extends IOException {
+public class PartitionNotFoundException extends PartitionException {
 
private static final long serialVersionUID = 0L;
 
-   private final ResultPartitionID partitionId;
-
public PartitionNotFoundException(ResultPartitionID partitionId) {
-   this.partitionId = partitionId;
-   }
-
-   public ResultPartitionID getPartitionId() {
-   return partitionId;
-   }
-
-   @Override
-   public String getMessage() {
-   return "Partition " + partitionId + " not found.";
+   super("Partition " + partitionId + " not found.", partitionId);
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index a08ecc2..c0a204b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
 imp

[flink] 01/08: [hotfix][network, tests] Add new unit tests for ResultPartitionManager#createSubpartitionView

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 222f87f35560ab4be24b11c20b4a04718968797c
Author: Zhijiang 
AuthorDate: Wed May 15 17:29:05 2019 +0800

[hotfix][network,tests] Add new unit tests for 
ResultPartitionManager#createSubpartitionView

It is necessary to make sure PartitionNotFoundException is thrown on 
producer side for flip1, so some new tests are added to cover the cases
of creating subpartition view via ResultPartitionManager for 
registered/unregistered partitions.
---
 .../partition/ResultPartitionManagerTest.java  | 63 ++
 1 file changed, 63 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java
new file mode 100644
index 000..99fd5c9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ResultPartitionManager}.
+ */
+public class ResultPartitionManagerTest extends TestLogger {
+
+   /**
+* Tests that {@link 
ResultPartitionManager#createSubpartitionView(ResultPartitionID, int, 
BufferAvailabilityListener)}
+* would throw {@link PartitionNotFoundException} if this partition was 
not registered before.
+*/
+   @Test
+   public void testThrowPartitionNotFoundException() throws Exception {
+   final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+   final ResultPartition partition = 
PartitionTestUtils.createPartition();
+   try {
+   
partitionManager.createSubpartitionView(partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
+
+   fail("Should throw PartitionNotFoundException for 
unregistered partition.");
+   } catch (PartitionNotFoundException notFound) {
+   assertThat(partition.getPartitionId(), 
Matchers.is(notFound.getPartitionId()));
+   }
+   }
+
+   /**
+* Tests {@link 
ResultPartitionManager#createSubpartitionView(ResultPartitionID, int, 
BufferAvailabilityListener)}
+* successful if this partition was already registered before.
+*/
+   @Test
+   public void testCreateViewForRegisteredPartition() throws Exception {
+   final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+   final ResultPartition partition = 
PartitionTestUtils.createPartition();
+
+   partitionManager.registerResultPartition(partition);
+   
partitionManager.createSubpartitionView(partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
+   }
+}



[flink] 04/08: [hotfix][network] Introduce PartititonRequestClient interface for creating simple client instance in tests

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0189db46316eff35d3d07f57ebcf116b1f61cea6
Author: Zhijiang 
AuthorDate: Mon May 20 18:55:59 2019 +0800

[hotfix][network] Introduce PartititonRequestClient interface for creating 
simple client instance in tests
---
 .../runtime/io/network/ConnectionManager.java  |  2 -
 .../runtime/io/network/LocalConnectionManager.java |  2 -
 .../runtime/io/network/PartitionRequestClient.java | 70 ++
 .../io/network/netty/NettyConnectionManager.java   |  1 +
 ...lient.java => NettyPartitionRequestClient.java} | 22 +++
 .../netty/PartitionRequestClientFactory.java   | 21 +++
 .../partition/consumer/RemoteInputChannel.java |  2 +-
 .../netty/ClientTransportErrorHandlingTest.java| 19 +++---
 ...editBasedPartitionRequestClientHandlerTest.java |  5 +-
 ...t.java => NettyPartitionRequestClientTest.java} |  9 +--
 .../network/partition/InputChannelTestUtils.java   |  2 +-
 .../partition/consumer/RemoteInputChannelTest.java |  2 +-
 12 files changed, 113 insertions(+), 44 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 75f39e9..c342750 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-
 import java.io.IOException;
 
 /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 46ca7fc..319a9ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-
 /**
  * A connection manager implementation to bypass setup overhead for task 
managers running in local
  * execution mode.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
new file mode 100644
index 000..a215700
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import java.io.IOException;
+
+/**
+ * Client to send messages or task events via network for {@link 
RemoteInputChannel}.
+ */
+public interface PartitionRequestClient {
+
+   /**
+* Requests a remote sub partition.
+*
+* @param partitionId The identifier of result partition to be 
requested.
+* @param subpartitionIndex The sub partition index in the requested 
result partition.
+* @param inputChannel The remote input channel for requesting the sub 
partition.
+* @param delayMs The request is scheduled within a delay time.
+*/
+   void requestSubpartition(
+   ResultPartitionID partitionId,
+   int subpartitionIndex,
+   RemoteInputChannel inputChannel,
+   int delayMs) throws IOException;
+
+   /**
+* Notifies available credits from one remote input channel.
+*
+* @param inputChannel The remote input channel who announces the 
available credits.
+*/
+   void notifyCreditAvailab

[flink] 02/08: [hotfix][network, tests] Add new unit tests for PartitionRequestServerHandler

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f78e494cb4710c916e3e7bee32c792d42f243bb2
Author: Zhijiang 
AuthorDate: Wed May 15 17:36:32 2019 +0800

[hotfix][network,tests] Add new unit tests for PartitionRequestServerHandler

It is necessary to make sure the network server would not transform or 
swallow the PartitionNotFoundException thrown by ResultPartitionManager, so a 
new unit test
is added for PartitionReuqestServerHandler to cover this case.
---
 .../netty/PartitionRequestServerHandlerTest.java   | 71 ++
 1 file changed, 71 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
new file mode 100644
index 000..23f2254
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PartitionRequestServerHandler}.
+ */
+public class PartitionRequestServerHandlerTest extends TestLogger {
+
+   /**
+* Tests that {@link PartitionRequestServerHandler} responds {@link 
ErrorResponse} with wrapped
+* {@link PartitionNotFoundException} after receiving invalid {@link 
PartitionRequest}.
+*/
+   @Test
+   public void testResponsePartitionNotFoundException() {
+   final PartitionRequestServerHandler serverHandler = new 
PartitionRequestServerHandler(
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new PartitionRequestQueue(),
+   true);
+   final EmbeddedChannel channel = new 
EmbeddedChannel(serverHandler);
+   final ResultPartitionID partitionId = new ResultPartitionID();
+
+   // Write the message of partition request to server
+   channel.writeInbound(new PartitionRequest(partitionId, 0, new 
InputChannelID(), 2));
+   channel.runPendingTasks();
+
+   // Read the response message after handling partition request
+   final Object msg = channel.readOutbound();
+   assertThat(msg, instanceOf(ErrorResponse.class));
+
+   final ErrorResponse err = (ErrorResponse) msg;
+   assertThat(err.cause, 
instanceOf(PartitionNotFoundException.class));
+
+   final ResultPartitionID actualPartitionId = 
((PartitionNotFoundException) err.cause).getPartitionId();
+   assertThat(partitionId, is(actualPartitionId));
+   }
+}



[flink] branch master updated (2f39736 -> 49e1832)

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2f39736  [FLINK-12127][network] Consolidate network options in 
NetworkEnvironmentOptions
 new 222f87f  [hotfix][network,tests] Add new unit tests for 
ResultPartitionManager#createSubpartitionView
 new f78e494  [hotfix][network,tests] Add new unit tests for 
PartitionRequestServerHandler
 new 22f87ee  [hotfix][network,tests] Add new unit test for 
LocalInputChannel#requestSubpartition
 new 0189db4  [hotfix][network] Introduce PartititonRequestClient interface 
for creating simple client instance in tests
 new 9f2338c  [hotfix][network,tests] Add new unit test for 
RemoteInputChannel#retriggerSubpartitionRequest
 new 6d0b6b9  [hotfix][coordination] Refactor PartitionException to 
PartitionUpdateException
 new 5e72b34  [FLINK-6227][network] Introduce PartitionException to 
indicate restarting producer on JM side
 new 49e1832  [FLINK-12458][network] Introduce PartitionConnectionException 
for unreachable producer

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/network/ConnectionManager.java  |  2 -
 .../runtime/io/network/LocalConnectionManager.java |  2 -
 .../runtime/io/network/PartitionRequestClient.java | 70 +
 .../io/network/netty/NettyConnectionManager.java   |  1 +
 ...lient.java => NettyPartitionRequestClient.java} | 22 ---
 .../netty/PartitionRequestClientFactory.java   | 21 ---
 ...FoundException.java => PartitionException.java} | 19 +++---
 .../partition/PartitionNotFoundException.java  | 17 +
 .../network/partition/consumer/InputChannel.java   |  4 ++
 .../consumer/PartitionConnectionException.java}| 18 +++---
 .../partition/consumer/RemoteInputChannel.java | 10 ++-
 .../partition/consumer/SingleInputGate.java|  5 ++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 ...xception.java => PartitionUpdateException.java} |  8 +--
 .../io/network/TestingConnectionManager.java}  | 15 +++--
 .../io/network/TestingPartitionRequestClient.java} | 35 +++
 .../netty/ClientTransportErrorHandlingTest.java| 19 +++---
 ...editBasedPartitionRequestClientHandlerTest.java |  5 +-
 ...t.java => NettyPartitionRequestClientTest.java} |  9 +--
 .../netty/PartitionRequestServerHandlerTest.java   | 71 +
 .../network/partition/InputChannelTestUtils.java   |  2 +-
 .../partition/ResultPartitionManagerTest.java  | 63 +++
 .../partition/consumer/LocalInputChannelTest.java  | 72 ++
 .../partition/consumer/RemoteInputChannelTest.java | 51 ++-
 .../partition/consumer/SingleInputGateTest.java| 26 
 .../taskexecutor/TaskExecutorSubmissionTest.java   |  4 +-
 26 files changed, 469 insertions(+), 106 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/{PartitionRequestClient.java
 => NettyPartitionRequestClient.java} (94%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/{PartitionNotFoundException.java
 => PartitionException.java} (68%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{jobmanager/PartitionProducerDisposedException.java
 => io/network/partition/consumer/PartitionConnectionException.java} (62%)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/{PartitionException.java
 => PartitionUpdateException.java} (82%)
 copy 
flink-runtime/src/{main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 => 
test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java} 
(77%)
 copy 
flink-runtime/src/{main/java/org/apache/flink/runtime/io/network/TaskEventPublisher.java
 => 
test/java/org/apache/flink/runtime/io/network/TestingPartitionRequestClient.java}
 (57%)
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/{PartitionRequestClientTest.java
 => NettyPartitionRequestClientTest.java} (95%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java



[flink] 03/08: [hotfix][network, tests] Add new unit test for LocalInputChannel#requestSubpartition

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22f87ee9761b06eeccf8c5adcbd9f4aa96803302
Author: Zhijiang 
AuthorDate: Thu May 16 16:53:51 2019 +0800

[hotfix][network,tests] Add new unit test for 
LocalInputChannel#requestSubpartition

It is necessary for flip1 to make sure the PartitionNotFoundException would 
be thrown by LocalInputChannel#requestSubpartition if the partition
was not registered in ResultPartitionManager before. So a new unit test is 
added to cover this case.
---
 .../partition/consumer/SingleInputGate.java|  5 ++
 .../partition/consumer/LocalInputChannelTest.java  | 72 ++
 2 files changed, 77 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 6c23698..63504bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate {
}
}
 
+   @VisibleForTesting
+   Timer getRetriggerLocalRequestTimer() {
+   return retriggerLocalRequestTimer;
+   }
+
@Override
public void close() throws IOException {
boolean released = false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 505f792..a3bc696 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,6 +61,8 @@ import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -247,6 +250,75 @@ public class LocalInputChannelTest {
}
 
/**
+* Tests that {@link LocalInputChannel#requestSubpartition(int)} throws 
{@link PartitionNotFoundException}
+* if the result partition was not registered in {@link 
ResultPartitionManager} and no backoff.
+*/
+   @Test
+   public void testPartitionNotFoundExceptionWhileRequestingPartition() 
throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate(1);
+   final LocalInputChannel localChannel = 
createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+   try {
+   localChannel.requestSubpartition(0);
+
+   fail("Should throw a PartitionNotFoundException.");
+   } catch (PartitionNotFoundException notFound) {
+   assertThat(localChannel.getPartitionId(), 
Matchers.is(notFound.getPartitionId()));
+   }
+   }
+
+   /**
+* Tests that {@link 
SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is 
triggered
+* after {@link LocalInputChannel#requestSubpartition(int)} throws 
{@link PartitionNotFoundException}
+* within backoff.
+*/
+   @Test
+   public void testRetriggerPartitionRequestWhilePartitionNotFound() 
throws Exception {
+   final SingleInputGate inputGate = createSingleInputGate(1);
+   final LocalInputChannel localChannel = createLocalInputChannel(
+   inputGate, new ResultPartitionManager(), 1, 1);
+
+   
inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), 
localChannel);
+   localChannel.requestSubpartition(0);
+
+   // The timer should be initialized at the first time of 
retriggering partition request.
+   assertNotNull(inputGate.getRetriggerLocalRequestTimer());
+   }
+
+   /**
+* Tests that {@link 
LocalInputChannel#retriggerSubparti

[flink] 08/08: [FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49e1832056483d17f540a515cc5be7be1654dd48
Author: Zhijiang 
AuthorDate: Wed May 22 14:45:46 2019 +0800

[FLINK-12458][network] Introduce PartitionConnectionException for 
unreachable producer

If the consumer can not establish a connection to remote task executor 
while requesting remote subpartition, which might indicate the remote task 
executor is not reachable.
We could wrap this connection exception into new proposed 
PartitionConnectionException which also extends PartitionException, then the 
job master would decide whether to
restart the upstream region to re-producer partition data.

This closes #8509.
---
 .../consumer/PartitionConnectionException.java}| 35 +++---
 .../partition/consumer/RemoteInputChannel.java |  8 +++--
 .../io/network/TestingConnectionManager.java   |  4 ++-
 .../partition/consumer/RemoteInputChannelTest.java | 24 +++
 4 files changed, 43 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
similarity index 54%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
index 822314d..4713dfd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/PartitionConnectionException.java
@@ -16,35 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network;
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 /**
- * A dummy implementation of the {@link ConnectionManager} which is mainly 
used for creating
- * {@link PartitionRequestClient} instance in tests.
+ * Exception for failed partition requests due to connection failure
+ * with unreachable producer.
  */
-public class TestingConnectionManager implements ConnectionManager {
-
-   @Override
-   public void start() {}
-
-   @Override
-   public PartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) {
-   return new TestingPartitionRequestClient();
-   }
+public class PartitionConnectionException extends PartitionException {
 
-   @Override
-   public void closeOpenChannelConnections(ConnectionID connectionId) {}
+   private static final long serialVersionUID = 0L;
 
-   @Override
-   public int getNumberOfActiveConnections() {
-   return 0;
+   public PartitionConnectionException(ResultPartitionID partitionId, 
Throwable throwable) {
+   super("Connection for partition " + partitionId + " not 
reachable.", partitionId, throwable);
}
-
-   @Override
-   public int getDataPort() {
-   return -1;
-   }
-
-   @Override
-   public void shutdown() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index fabc495..2d174ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -161,8 +161,12 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
public void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
-   partitionRequestClient = connectionManager
-   .createPartitionRequestClient(connectionId);
+   try {
+   partitionRequestClient = 
connectionManager.createPartitionRequestClient(connectionId);
+   } catch (IOException e) {
+   // IOExceptions indicate that we could not open 
a connection to the remote TaskExecutor
+   throw new 
PartitionConnectionException(partitionId, e);
+   }
 
partitionRequestClient.requestSubpartition(partitionId, 
subpartitionIndex, this, 0);
   

[flink] 06/08: [hotfix][coordination] Refactor PartitionException to PartitionUpdateException

2019-05-23 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d0b6b9af462432aef17d753f2bd8c3b8e04b0a1
Author: Zhijiang 
AuthorDate: Fri May 17 12:07:07 2019 +0800

[hotfix][coordination] Refactor PartitionException to 
PartitionUpdateException

The current usage of PartitionException is only for describing the RPC of 
update parttitions failed, so the exception rename would not have any other 
effects.
In flip1 PartitionException is used for presenting all the cases that 
indicate to restart the producer while the consumer failed.
---
 .../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java  | 4 ++--
 .../{PartitionException.java => PartitionUpdateException.java}| 8 
 .../flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java| 4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index b35d65e..dcfabbf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -80,7 +80,7 @@ import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
-import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.PartitionUpdateException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
@@ -637,7 +637,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
});
} else {
return 
FutureUtils.completedExceptionally(
-   new PartitionException("No 
reader with ID " + intermediateResultPartitionID +
+   new 
PartitionUpdateException("No reader with ID " + intermediateResultPartitionID +
" for task " + 
executionAttemptID + " was found."));
}
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionUpdateException.java
similarity index 82%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionUpdateException.java
index eecd0ae..fa12426 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionUpdateException.java
@@ -23,19 +23,19 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 /**
  * Exception indicating a problem with the result partitions on the {@link 
TaskExecutor} side.
  */
-public class PartitionException extends TaskManagerException {
+public class PartitionUpdateException extends TaskManagerException {
 
private static final long serialVersionUID = 6248696963418276618L;
 
-   public PartitionException(String message) {
+   public PartitionUpdateException(String message) {
super(message);
}
 
-   public PartitionException(String message, Throwable cause) {
+   public PartitionUpdateException(String message, Throwable cause) {
super(message, cause);
}
 
-   public PartitionException(Throwable cause) {
+   public PartitionUpdateException(Throwable cause) {
super(cause);
}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 711256e..cde7259 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -53,7 +53,7 @@ import 
org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatew

[flink] branch master updated: [FLINK-12236][hive] Support Hive function in HiveCatalog

2019-05-23 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f74bbec  [FLINK-12236][hive] Support Hive function in HiveCatalog
f74bbec is described below

commit f74bbec1794a8a836d2ffc8f248e032610f35157
Author: bowen.li 
AuthorDate: Tue May 21 20:54:59 2019 -0700

[FLINK-12236][hive] Support Hive function in HiveCatalog

This PR creates HiveCatalogFunction class and adds support for Hive 
function operations in HiveCatalog.

This closes #8507.
---
 .../flink/table/catalog/hive/HiveCatalog.java  | 27 ++--
 .../table/catalog/hive/HiveCatalogFunction.java| 49 +
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 51 +-
 3 files changed, 74 insertions(+), 53 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 22fe08b..1f0bddc 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -659,6 +659,8 @@ public class HiveCatalog implements Catalog {
Function hiveFunction;
if (function instanceof GenericCatalogFunction) {
hiveFunction = instantiateHiveFunction(functionPath, 
(GenericCatalogFunction) function);
+   } else if (function instanceof HiveCatalogFunction) {
+   hiveFunction = instantiateHiveFunction(functionPath, 
(HiveCatalogFunction) function);
} else {
throw new CatalogException(
String.format("Unsupported catalog function 
type %s", function.getClass().getName()));
@@ -694,8 +696,10 @@ public class HiveCatalog implements Catalog {
}
 
Function hiveFunction;
-   if (existingFunction instanceof GenericCatalogFunction 
&& newFunction instanceof GenericCatalogFunction) {
-   hiveFunction = 
instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction);
+   if (newFunction instanceof GenericCatalogFunction) {
+   hiveFunction = 
instantiateHiveFunction(functionPath, (GenericCatalogFunction) newFunction);
+   } else if (newFunction instanceof HiveCatalogFunction) {
+   hiveFunction = 
instantiateHiveFunction(functionPath, (HiveCatalogFunction) newFunction);
} else {
throw new CatalogException(
String.format("Unsupported catalog 
function type %s", newFunction.getClass().getName()));
@@ -758,7 +762,7 @@ public class HiveCatalog implements Catalog {
Function function = 
client.getFunction(functionPath.getDatabaseName(), 
functionPath.getObjectName());
 
if 
(function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) {
-   // TODO: extract more properties from Hive 
function and add to CatalogFunction's properties
+   // TODO: extract more properties from Hive 
function and add to GenericCatalogFunction's properties
 
Map properties = new 
HashMap<>();

properties.put(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY, 
GenericInMemoryCatalog.FLINK_IS_GENERIC_VALUE);
@@ -766,7 +770,9 @@ public class HiveCatalog implements Catalog {
return new GenericCatalogFunction(

function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()), properties);
} else {
-   throw new CatalogException("Hive function is 
not supported yet");
+   // TODO: extract more properties from Hive 
function and add to HiveCatalogFunction's properties
+
+   return new 
HiveCatalogFunction(function.getClassName());
}
} catch (NoSuchObjectException e) {
throw new FunctionNotExistException(catalogName, 
functionPath, e);
@@ -803,6 +809,19 @@ public class HiveCatalog implements Catalog {
);
}
 
+   private static Function instantiateHiveFunction(ObjectPath 
functionPath, HiveCatalogFunction function) {
+   return new Function(
+   functionPath.getObjectName(),
+   functionPath.getData

buildbot failure in on flink-docs-master

2019-05-23 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/1480

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

BUILD FAILED: failed Build docs

Sincerely,
 -The Buildbot





[flink] branch master updated: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-23 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 95e1686  [FLINK-12235][hive] Support partition related operations in 
HiveCatalog
95e1686 is described below

commit 95e16862822922972f70a710bc09b7ec31c9043b
Author: Rui Li 
AuthorDate: Wed May 15 19:44:06 2019 +0800

[FLINK-12235][hive] Support partition related operations in HiveCatalog

This PR adds support for Hive partitions in HiveCatalog.

This closes #8449.
---
 .../flink/table/catalog/hive/HiveCatalog.java  | 270 ++-
 .../table/catalog/hive/HiveCatalogPartition.java   |  61 
 .../hive/HiveCatalogGenericMetadataTest.java   | 108 ++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |  18 +
 .../table/catalog/GenericInMemoryCatalog.java  |  82 ++---
 .../table/catalog/GenericInMemoryCatalogTest.java  | 371 +---
 .../org/apache/flink/table/catalog/Catalog.java|   2 +-
 .../flink/table/catalog/CatalogTestBase.java   | 379 +
 .../flink/table/catalog/CatalogTestUtil.java   |   4 -
 9 files changed, 859 insertions(+), 436 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1f0bddc..d2387f0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogFunction;
+import org.apache.flink.table.catalog.GenericCatalogPartition;
 import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -53,6 +54,7 @@ import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -500,7 +503,7 @@ public class HiveCatalog implements Catalog {
// Partition keys
List partitionKeys = new ArrayList<>();
if (!hiveTable.getPartitionKeys().isEmpty()) {
-   partitionKeys = 
hiveTable.getPartitionKeys().stream().map(fs -> 
fs.getName()).collect(Collectors.toList());
+   partitionKeys = 
getFieldNames(hiveTable.getPartitionKeys());
}
 
if (isView) {
@@ -608,44 +611,285 @@ public class HiveCatalog implements Catalog {
// -- partitions --
 
@Override
+   public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws CatalogException {
+   checkNotNull(tablePath, "Table path cannot be null");
+   checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+
+   try {
+   return getHivePartition(tablePath, partitionSpec) != 
null;
+   } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   return false;
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+   }
+   }
+
+   @Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsExcept

[flink] branch master updated: [FLINK-12592][python] Add `--force` for python install.

2019-05-23 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 873bb4d  [FLINK-12592][python] Add `--force` for python install.
873bb4d is described below

commit 873bb4d6388194267656f40528fea8d2c6f1d450
Author: sunjincheng121 
AuthorDate: Fri May 24 12:33:27 2019 +0800

[FLINK-12592][python] Add `--force` for python install.

This closes #8525
---
 flink-python/tox.ini | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 23d250a..b8d079f 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -28,7 +28,7 @@ deps =
 pytest
 commands =
 python --version
-python setup.py install
+python setup.py install --force
 pytest
 
 [flake8]