[GitHub] [flink] shuai-xu commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-12 Thread GitBox
shuai-xu commented on a change in pull request #7227: [FLINK-11059] [runtime] 
do not add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#discussion_r293215772
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/OfferedSlot.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A snapshot of the slots on a TaskManager in SlotPool of the JobMaster.
+ */
+public class OfferedSlot implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final int slotIndex;
+
+   private final AllocationID allocationId;
+
+   public OfferedSlot(int index, AllocationID allocationId) {
+   checkArgument(index >= 0);
+   this.slotIndex = index;
+   this.allocationId = checkNotNull(allocationId);
+   }
+
+   public AllocationID getAllocationId() {
+   return allocationId;
+   }
+
+   public int getSlotIndex() {
+   return slotIndex;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   OfferedSlot that = (OfferedSlot) o;
+   return slotIndex == that.slotIndex && 
allocationId.equals(allocationId);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(slotIndex, allocationId);
+   }
 
 Review comment:
   hashCode() is not used actually, equals() is needed by tests assertThat( , 
contains());


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293215401
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293214787
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+
+/**
+ * Abstract class to provide more information for Hive {@link UDF} and {@link 
GenericUDF} functions.
+ */
+@Internal
+public abstract class HiveScalarFunction extends ScalarFunction 
implements HiveFunction {
+
+   protected final HiveFunctionWrapper hiveFunctionWrapper;
+
+   protected Object[] constantArguments;
+   protected DataType[] argTypes;
+
+   protected transient UDFType function;
+   protected transient ObjectInspector returnInspector;
+
+   private transient boolean isArgsSingleArray;
+
+   HiveScalarFunction(HiveFunctionWrapper hiveFunctionWrapper) {
+   this.hiveFunctionWrapper = hiveFunctionWrapper;
+   }
+
+   @Override
+   public void setArgumentTypesAndConstants(Object[] constantArguments, 
DataType[] argTypes) {
+   this.constantArguments = constantArguments;
+   this.argTypes = argTypes;
+   }
+
+   @Override
+   public boolean isDeterministic() {
+   try {
+   org.apache.hadoop.hive.ql.udf.UDFType udfType =
+   hiveFunctionWrapper.getUDFClass()
+   
.getAnnotation(org.apache.hadoop.hive.ql.udf.UDFType.class);
+
+   return udfType != null && udfType.deterministic() && 
!udfType.stateful();
+   } catch (ClassNotFoundException e) {
+   throw new FlinkHiveUDFException(e);
+   }
+   }
+
+   @Override
+   public TypeInformation getResultType(Class[] signature) {
+   return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+   getHiveResultType(this.constantArguments, 
this.argTypes));
+   }
+
+   @Override
+   public void open(FunctionContext context) {
+   openInternal();
+   isArgsSingleArray = argTypes.length == 1 && (argTypes[0] 
instanceof CollectionDataType);
 
 Review comment:
   For example, int[] pass to eval(Object...) is ok, but Integer[] pass to 
eval(Object...) is wrong.
   So we need set `isArgsSingleArray` when Integer[] instead of int[]. (int[] 
is primitive array)
   
   IMO, this is a wrong code, I don't think we should merge it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293213385
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -47,14 +70,33 @@
 * @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
 *  param is not optional but has no default 
value in the {@code info}
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   Stream stream = getParamNameAndAlias(info)
+   .filter(this.params::containsKey)
+   .map(x -> this.params.get(x))
+   .map(x -> valueFromJson(x, info.getValueClass()))
+   .limit(1);
 
 Review comment:
   Thanks for comments and code snippet. The implementation without Stream is 
better. For the case of specified values for multiple alias, I think it is 
better to throw exception. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog

2019-06-12 Thread GitBox
flinkbot commented on issue #8720: [FLINK-12771][hive] Support 
ConnectorCatalogTable in HiveCatalog
URL: https://github.com/apache/flink/pull/8720#issuecomment-501556430
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-12 Thread GitBox
danny0405 commented on a change in pull request #8626: [FLINK-12742] Add insert 
into partition grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626#discussion_r293212505
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dml;
+
+import org.apache.flink.sql.parser.SqlProperty;
+import org.apache.flink.sql.parser.ddl.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/** A {@link SqlInsert} that have some extension functions like partition, 
overwrite. **/
+public class RichSqlInsert extends SqlInsert implements ExtendedSqlNode {
+   private final SqlNodeList staticPartitions;
+
+   public RichSqlInsert(SqlParserPos pos,
+   SqlNodeList keywords,
+   SqlNode targetTable,
+   SqlNode source,
+   SqlNodeList columnList,
+   SqlNodeList staticPartitions) {
+   super(pos, keywords, targetTable, source, columnList);
+   this.staticPartitions = staticPartitions;
+   }
+
+   @Override
+   public void validate(SqlValidator validator, SqlValidatorScope scope) {
+   final SqlNode source = this.getSource();
+   if (this.staticPartitions != null
+   && this.staticPartitions.size() > 0
+   && source instanceof SqlSelect) {
+   appendPartitionProjects(validator, (SqlSelect) source, 
this.staticPartitions);
 
 Review comment:
   I agree, have added self check to ensure that this side-effects only take 
once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] danny0405 commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-12 Thread GitBox
danny0405 commented on a change in pull request #8626: [FLINK-12742] Add insert 
into partition grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626#discussion_r293212367
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dml;
+
+import org.apache.flink.sql.parser.SqlProperty;
+import org.apache.flink.sql.parser.ddl.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/** A {@link SqlInsert} that have some extension functions like partition, 
overwrite. **/
+public class RichSqlInsert extends SqlInsert implements ExtendedSqlNode {
+   private final SqlNodeList staticPartitions;
+
+   public RichSqlInsert(SqlParserPos pos,
+   SqlNodeList keywords,
+   SqlNode targetTable,
+   SqlNode source,
+   SqlNodeList columnList,
+   SqlNodeList staticPartitions) {
+   super(pos, keywords, targetTable, source, columnList);
+   this.staticPartitions = staticPartitions;
+   }
+
+   @Override
+   public void validate(SqlValidator validator, SqlValidatorScope scope) {
+   final SqlNode source = this.getSource();
+   if (this.staticPartitions != null
+   && this.staticPartitions.size() > 0
+   && source instanceof SqlSelect) {
+   appendPartitionProjects(validator, (SqlSelect) source, 
this.staticPartitions);
+   }
+   super.validate(validator, scope);
+   }
+
+   /**
+* @return the list of partition key-value pairs,
+* returns empty if there is no partition specifications.
+*/
+   public SqlNodeList getStaticPartitions() {
 
 Review comment:
   The main usage is for validation, for example, we eliminate that the static 
partition must appears in front of the dynamic partition, so we need info of 
which partitions are static.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12771:
---
Labels: pull-request-available  (was: )

> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog

2019-06-12 Thread GitBox
bowenli86 opened a new pull request #8720: [FLINK-12771][hive] Support 
ConnectorCatalogTable in HiveCatalog
URL: https://github.com/apache/flink/pull/8720
 
 
   ## What is the purpose of the change
   
   This PR adds support for operations of `ConnectorCatalogTable` in 
HiveCatalog.
   
   ## Brief change log
   
   - added support for operations of `ConnectorCatalogTable` in catalog APIs in 
HiveCatalog
   - developed `ConnectorCatalogTableTestBase` and two test impl for each 
catalog to test catalog on `ConnectorCatalogTable`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - base test class `ConnectorCatalogTableTestBase`
   - `GenericInMemoryConnectorCatalogTableTest`  and 
`HiveConnectorCatalogTableTest` for each catalog impl
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] implement ExecutionSlotAllocator

2019-06-12 Thread GitBox
shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] 
implement ExecutionSlotAllocator
URL: https://github.com/apache/flink/pull/8486#discussion_r293211952
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} 
to allocate slots and
+ * keep the unfulfilled requests for further cancellation.
+ */
+public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
+
+   /**
+* Store the uncompleted slot assignments.
+*/
+   private final Map 
pendingSlotAssignments;
+
+   private final SlotProvider slotProvider;
+
+   private final InputsLocationsRetriever inputsLocationsRetriever;
+
+   private final Time allocationTimeout;
+
+   public DefaultExecutionSlotAllocator(
+   SlotProvider slotProvider,
+   InputsLocationsRetriever inputsLocationsRetriever,
+   Time allocationTimeout) {
+   this.slotProvider = checkNotNull(slotProvider);
+   this.inputsLocationsRetriever = 
checkNotNull(inputsLocationsRetriever);
+   this.allocationTimeout = checkNotNull(allocationTimeout);
+
+   pendingSlotAssignments = new HashMap<>();
+   }
+
+   @Override
+   public Collection allocateSlotsFor(
+   Collection 
executionVertexSchedulingRequirements) {
+
+   List 
slotExecutionVertexAssignments =
+   new 
ArrayList<>(executionVertexSchedulingRequirements.size());
+
+   Set allPreviousAllocationIds = 
computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
+
+   for (ExecutionVertexSchedulingRequirements 
schedulingRequirements : executionVertexSchedulingRequirements) {
+   final ExecutionVertexID executionVertexId = 
schedulingRequirements.getExecutionVertexId();
+   final SlotRequestId slotRequestId = new SlotRequestId();
+   final SlotSharingGroupId slotSharingGroupId = 
schedulingRequirements.getSlotSharingGroupId();
+
+   LOG.debug("Allocate slot with id {} for execution {}", 
slotRequestId, executionVertexId);
+
+   CompletableFuture slotFuture = 
calculatePreferredLocations(
+   executionVertexId,
+   
schedulingRequirements.getPreferredLocations(),
+

[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293211827
 
 

 ##
 File path: flink-ml-parent/flink-ml/pom.xml
 ##
 @@ -0,0 +1,96 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-ml-parent
+   1.9-SNAPSHOT
+   
+   4.0.0
+
+   flink-ml-lib_${scala.binary.version}
+   flink-ml-lib
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-compiler-plugin
+   
+   1.8
+   1.8
+   
+   
+   
+   
+
+   
+   2.8.2
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-table-api-java
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-table-api-java-bridge_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-table-planner_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-ml-api
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-shaded-jackson
+   
${jackson.version}-${flink.shaded.version}
+   
+
+   
+   
+   com.google.guava
+   guava
 
 Review comment:
   removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293211715
 
 

 ##
 File path: flink-ml-parent/pom.xml
 ##
 @@ -35,5 +35,6 @@ under the License.
 

flink-ml-api
+   flink-ml
 
 Review comment:
   OK, we changed to flink-ml-lib.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293211372
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/params/validators/RangeValidator.java
 ##
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.params.validators;
+
+import org.apache.flink.ml.api.misc.param.ParamValidator;
+
+/**
+ * Range Validtor.
+ */
+public class RangeValidator> implements 
ParamValidator  {
+   T minVal;
+   T maxVal;
+   boolean leftInclusive = true;
+   boolean rightInclusive = true;
+
+   public RangeValidator(T minVal, T maxVal) {
+   this.minVal = minVal;
+   this.maxVal = maxVal;
+   }
+
+   public RangeValidator  withLeftInclusive(boolean tag) {
+   this.leftInclusive = tag;
+   return this;
+   }
+
+   public RangeValidator  withRightInclusive(boolean tag) {
+   this.rightInclusive = tag;
+   return this;
+   }
+
+   public void setLeftInclusive(boolean leftInclusive) {
 
 Review comment:
   This functionality is not used by left parameter definitions, thus we 
removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293210991
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -47,14 +70,33 @@
 * @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
 *  param is not optional but has no default 
value in the {@code info}
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   Stream paramValue = getParamNameAndAlias(info)
+   .filter(this.params::containsKey)
+   .map(x -> this.params.get(x))
+   .map(x -> valueFromJson(x, info.getValueClass()))
+   .limit(1);
+
+   if (info.isOptional()) {
+   if (info.hasDefaultValue()) {
+   return 
paramValue.reduce(info.getDefaultValue(), (a, b) -> b);
+   } else {
+   return 
paramValue.collect(Collectors.collectingAndThen(Collectors.toList(),
+   a -> {
+   if (a.isEmpty()) {
 
 Review comment:
   Some optional parameter have no default value. For example, in the LR model 
prediction, user need get the predict result, and most of times, the extra 
predict detail info is not needed. Thus, PredResultColName is required, and 
PredDetailColName is optional without default value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293210678
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293210678
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293210678
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293210678
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293210654
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293209304
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -25,16 +25,39 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The map-like container class for parameter. This class is provided to unify 
the interaction with
  * parameters.
  */
 @PublicEvolving
-public class Params implements Serializable {
-   private final Map paramMap = new HashMap<>();
+public class Params implements Serializable, Cloneable {
+   private static final long serialVersionUID = 1L;
+
+   private final Map params;
 
 Review comment:
   Thanks, we added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293209225
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -25,16 +25,39 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The map-like container class for parameter. This class is provided to unify 
the interaction with
  * parameters.
  */
 @PublicEvolving
-public class Params implements Serializable {
-   private final Map paramMap = new HashMap<>();
+public class Params implements Serializable, Cloneable {
+   private static final long serialVersionUID = 1L;
+
+   private final Map params;
+
+   private transient ObjectMapper mapper;
+
+   public Params() {
 
 Review comment:
   Yes, we added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on issue #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-12 Thread GitBox
xuyang1706 commented on issue #8632: [FLINK-12744][ml] add shared params in ml 
package
URL: https://github.com/apache/flink/pull/8632#issuecomment-501552635
 
 
   > @xuyang1706 Thanks for the patch. I made a quick pass and left some 
comments.
   > 
   > I have to admit that I do not have enough expertise to say whether the 
various params interfaces added in `o.a.f.ml.params` are necessary or not. It 
will be great if you can help the reviewer understand this better. One possible 
way is to enhance the Java doc for them. It might also be better to check in 
some of the specific `WithParams` subclasses together with the algorithm that 
actually uses them, instead of in this PR. Doing that would make it a lot 
easier to understand what those `WithParams` interfaces are for.
   > 
   > Please let me know what do you think.
   
   @becketqin Thanks for your comments. I removed the most of parameter info 
definitions, keep the common and typical ones which are helpful to understand 
current refactoring of ParamInfo and Params. The left parameter infos could be 
understandable by their name. The others will be added with according 
algorithms. Current codes might be easier for the reviewers. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] arganzheng commented on issue #8717: fix issue https://issues.apache.org/jira/browse/FLINK-12725

2019-06-12 Thread GitBox
arganzheng commented on issue #8717: fix issue 
https://issues.apache.org/jira/browse/FLINK-12725
URL: https://github.com/apache/flink/pull/8717#issuecomment-501551892
 
 
   @aljoscha Here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12725) Need to copy flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib location

2019-06-12 Thread arganzheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862726#comment-16862726
 ] 

arganzheng commented on FLINK-12725:


[~aljoscha] No problem. I just open a Pull Request here: 
[https://github.com/apache/flink/pull/8717]

> Need to copy flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib 
> location
> 
>
> Key: FLINK-12725
> URL: https://issues.apache.org/jira/browse/FLINK-12725
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Reporter: arganzheng
>Assignee: arganzheng
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I am currently working on an Flink application that uses some of the Hadoop 
> dependencies to write the data to HDFS.  On local environment it is working 
> fine, however when I deploy this Flink application on the cluster it throws 
> an exception related to compatibility issue.
>  The error message that I am getting is 
>  
> {code:java}
> java.lang.RuntimeException: Could not load the TypeInformation for the class 
> 'org.apache.hadoop.io.Writable'. You may be missing the 
> 'flink-hadoop-compatibility' dependency. at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
>  at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
>  at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
>  at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
>  
> {code}
>  I try to included the maven dependency of {{flink-hadoop-compatibility}} jar 
> in POM dependency. But it is not detecting it. The Flink version I am using 
> is 1.8.0
> However, when I explicitly copy the compatibility JAR to the 
> {{${FLINK-HOME}/lib}} location, I am not getting any exception and able to 
> run the Flink application successfully.
> I try dive into the source code, and find the problem:
> {code:java}
> package org.apache.flink.api.java.typeutils;
> public class TypeExtractor {
> /** The name of the class representing Hadoop's writable */
> private static final String HADOOP_WRITABLE_CLASS = 
> "org.apache.hadoop.io.Writable";
> private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = 
> "org.apache.flink.api.java.typeutils.WritableTypeInfo";
> // visible for testing
> public static  TypeInformation createHadoopWritableTypeInfo(Class 
> clazz) {
> checkNotNull(clazz);
> Class typeInfoClass;
> try {
> typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, 
> TypeExtractor.class.getClassLoader());
> }
> catch (ClassNotFoundException e) {
> throw new RuntimeException("Could not load the TypeInformation for the class 
> '"
> + HADOOP_WRITABLE_CLASS + "'. You may be missing the 
> 'flink-hadoop-compatibility' dependency.");
> }
> ...
> }
> }
> {code}
>  
> This is because `org.apache.hadoop.io.Writable` is mean to be loaded by 
> TypeExtractor.class.getClassLoader() which is `AppClassLoader`, and the 
> submited flink jar is loaded by `ParentFirstClassLoader`, which is the child 
> of `AppClassLoader`, so `AppClassLoader` can not load 
> `org.apache.hadoop.io.Writable` from your flink jar.
> I'm not sure if it's a bug, change to classLoader to 
> `Thread.currentThread().getContextClassLoader()` will make it work without 
> copy the flink-hadoop-compatibility jar file to ${FLINK-HOME}/lib location.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293207355
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+
+/**
+ * Abstract class to provide more information for Hive {@link UDF} and {@link 
GenericUDF} functions.
+ */
+@Internal
+public abstract class HiveScalarFunction extends ScalarFunction 
implements HiveFunction {
+
+   protected final HiveFunctionWrapper hiveFunctionWrapper;
+
+   protected Object[] constantArguments;
+   protected DataType[] argTypes;
+
+   protected transient UDFType function;
+   protected transient ObjectInspector returnInspector;
+
+   private transient boolean isArgsSingleArray;
+
+   HiveScalarFunction(HiveFunctionWrapper hiveFunctionWrapper) {
+   this.hiveFunctionWrapper = hiveFunctionWrapper;
+   }
+
+   @Override
+   public void setArgumentTypesAndConstants(Object[] constantArguments, 
DataType[] argTypes) {
+   this.constantArguments = constantArguments;
+   this.argTypes = argTypes;
+   }
+
+   @Override
+   public boolean isDeterministic() {
+   try {
+   org.apache.hadoop.hive.ql.udf.UDFType udfType =
+   hiveFunctionWrapper.getUDFClass()
+   
.getAnnotation(org.apache.hadoop.hive.ql.udf.UDFType.class);
+
+   return udfType != null && udfType.deterministic() && 
!udfType.stateful();
+   } catch (ClassNotFoundException e) {
+   throw new FlinkHiveUDFException(e);
+   }
+   }
+
+   @Override
+   public TypeInformation getResultType(Class[] signature) {
+   return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+   getHiveResultType(this.constantArguments, 
this.argTypes));
+   }
+
+   @Override
+   public void open(FunctionContext context) {
+   openInternal();
+   isArgsSingleArray = argTypes.length == 1 && (argTypes[0] 
instanceof CollectionDataType);
 
 Review comment:
   Good catch on the multiset thing!  What do you mean by the primitive array 
case? 
   
   BTW, the point here is that `HiveFunction` and its APIs are *temporary*. We 
don't know how and in which form the types are gonna be passed in yet, and this 
part should be refactored later to adapt to any outcome of the type system 
rework. So I think it's ok to not over-emphasize on this part for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
bowenli86 commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293207355
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+
+/**
+ * Abstract class to provide more information for Hive {@link UDF} and {@link 
GenericUDF} functions.
+ */
+@Internal
+public abstract class HiveScalarFunction extends ScalarFunction 
implements HiveFunction {
+
+   protected final HiveFunctionWrapper hiveFunctionWrapper;
+
+   protected Object[] constantArguments;
+   protected DataType[] argTypes;
+
+   protected transient UDFType function;
+   protected transient ObjectInspector returnInspector;
+
+   private transient boolean isArgsSingleArray;
+
+   HiveScalarFunction(HiveFunctionWrapper hiveFunctionWrapper) {
+   this.hiveFunctionWrapper = hiveFunctionWrapper;
+   }
+
+   @Override
+   public void setArgumentTypesAndConstants(Object[] constantArguments, 
DataType[] argTypes) {
+   this.constantArguments = constantArguments;
+   this.argTypes = argTypes;
+   }
+
+   @Override
+   public boolean isDeterministic() {
+   try {
+   org.apache.hadoop.hive.ql.udf.UDFType udfType =
+   hiveFunctionWrapper.getUDFClass()
+   
.getAnnotation(org.apache.hadoop.hive.ql.udf.UDFType.class);
+
+   return udfType != null && udfType.deterministic() && 
!udfType.stateful();
+   } catch (ClassNotFoundException e) {
+   throw new FlinkHiveUDFException(e);
+   }
+   }
+
+   @Override
+   public TypeInformation getResultType(Class[] signature) {
+   return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+   getHiveResultType(this.constantArguments, 
this.argTypes));
+   }
+
+   @Override
+   public void open(FunctionContext context) {
+   openInternal();
+   isArgsSingleArray = argTypes.length == 1 && (argTypes[0] 
instanceof CollectionDataType);
 
 Review comment:
   Good catch on the multiset thing!  What do you mean by the primitive array 
case? 
   
   BTW, the point here is that `HiveFunction` and its APIs are *temporary*. We 
don't know how and in which form the types are gonna be passed in yet, and this 
part should be refactored later to adapt to any outcome of the type system 
rework. So I think it's ok to not over-engineer the testing part for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8719: [FLINK-12767][python] Support user defined connectors/format

2019-06-12 Thread GitBox
flinkbot commented on issue #8719: [FLINK-12767][python] Support user defined 
connectors/format
URL: https://github.com/apache/flink/pull/8719#issuecomment-501548801
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu opened a new pull request #8719: [FLINK-12767][python] Support user defined connectors/format

2019-06-12 Thread GitBox
dianfu opened a new pull request #8719: [FLINK-12767][python] Support user 
defined connectors/format
URL: https://github.com/apache/flink/pull/8719
 
 
   ## What is the purpose of the change
   
   *Currently, only built-in connectors such as FileSystem/Kafka/ES are 
supported and only built-in formats such as OldCSV/JSON/Avro/CSV/ are 
supported. There is no way provided for users to use other connectors or 
formats in Python Table API. We should provide a convenient way for the 
connectors/formats that are not built-in supported. This pull request adds this 
support.*
   
   ## Brief change log
   
 - *Add CustomFormatDescriptor and CustomConnectorDescriptor in 
flink-table-common*
 - *Add the corresponding CustomFormatDescriptor and 
CustomConnectorDescriptor in flink-python*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added tests CustomFormatDescriptorTests and 
CustomConnectorDescriptorTests*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12767) Support user defined connectors/format

2019-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12767:
---
Labels: pull-request-available  (was: )

> Support user defined connectors/format
> --
>
> Key: FLINK-12767
> URL: https://issues.apache.org/jira/browse/FLINK-12767
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only built-in connectors such as FileSystem/Kafka/ES are supported 
> and only built-in formats such as OldCSV/JSON/Avro/CSV/ are supported. We 
> should also provide a convenient way for the connectors/formats that are not 
> built-in supported.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] implement ExecutionSlotAllocator

2019-06-12 Thread GitBox
shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] 
implement ExecutionSlotAllocator
URL: https://github.com/apache/flink/pull/8486#discussion_r293202121
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirements.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The requirements for scheduling a {@link ExecutionVertex}.
+ */
+public class ExecutionVertexSchedulingRequirements {
+
+   private final ExecutionVertexID executionVertexId;
+
+   private final AllocationID previousAllocationId;
+
+   private final ResourceProfile resourceProfile;
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   private final CoLocationConstraint coLocationConstraint;
+
+   private final Collection preferredLocations;
+
+   private ExecutionVertexSchedulingRequirements(
+   ExecutionVertexID executionVertexId,
+   AllocationID previousAllocationId,
+   ResourceProfile resourceProfile,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint,
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] implement ExecutionSlotAllocator

2019-06-12 Thread GitBox
shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] 
implement ExecutionSlotAllocator
URL: https://github.com/apache/flink/pull/8486#discussion_r293202093
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
 ##
 @@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link DefaultExecutionSlotAllocator}.
+ */
+public class DefaultExecutionSlotAllocatorTest extends TestLogger {
+
+   private AllocationToggableSlotProvider slotProvider;
+
+   @Before
+   public void setUp() throws Exception {
+   slotProvider = new AllocationToggableSlotProvider();
+   }
+
+   /**
+* Tests that consumers will get slots after producers are fulfilled.
+*/
+   @Test
+   public void testConsumersAssignedToSlotsAfterProducers() {
+   final ExecutionVertexID producerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+   final ExecutionVertexID consumerId = new ExecutionVertexID(new 
JobVertexID(), 0);
+
+   final TestingInputsLocationsRetriever inputsLocationsRetriever 
= new TestingInputsLocationsRetriever.Builder()
+   .connectConsumerToProducer(consumerId, 
producerId)
+   .build();
+
+   final DefaultExecutionSlotAllocator executionSlotAllocator = 
createExecutionSlotAllocator(inputsLocationsRetriever);
+
+   inputsLocationsRetriever.markScheduled(producerId);
+   inputsLocationsRetriever.markScheduled(consumerId);
+
+   final List 
schedulingRequirements = createSchedulingRequirements(producerId, consumerId);
+   final Collection 
slotExecutionVertexAssignments = 
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+   assertThat(slotExecutionVertexAssignments, hasSize(2));
+
+   final SlotExecutionVertexAssignment producerSlotAssignment = 
findSlotAssignmentByExecutionVertexId(producerId, 
slotExecutionVertexAssignments);
+   final SlotExecutionVertexAssignment 

[GitHub] [flink] flinkbot commented on issue #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-12 Thread GitBox
flinkbot commented on issue #8718: [FLINK-12824][table-planner-blink] Set 
parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#issuecomment-501543948
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] implement ExecutionSlotAllocator

2019-06-12 Thread GitBox
shuai-xu commented on a change in pull request #8486: [FLINK-12372] [runtime] 
implement ExecutionSlotAllocator
URL: https://github.com/apache/flink/pull/8486#discussion_r293201169
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component responsible for assigning slots to a collection of {@link 
Execution}.
+ */
+public interface ExecutionSlotAllocator {
+
+   /**
+* Allocates slots for the given executions.
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12824) set parallelism for stream SQL

2019-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12824:
---
Labels: pull-request-available  (was: )

> set parallelism for stream SQL
> --
>
> Key: FLINK-12824
> URL: https://issues.apache.org/jira/browse/FLINK-12824
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: XuPingyong
>Priority: Major
>  Labels: pull-request-available
>
> Parallelism setting has been developped for batch SQL,  the calculation 
> method can also apply to stream SQL.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293200602
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 ##
 @@ -276,4 +409,69 @@ private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskMa
null),
new TestingFatalErrorHandler());
}
+
+   private static class TestJobAwareShuffleEnvironment implements 
JobAwareShuffleEnvironment {
+
+   private final Function 
hasPartitionsOccupyingLocalResourcesFunction;
+   private Consumer listener = null;
+
+   private TestJobAwareShuffleEnvironment(Function 
hasPartitionsOccupyingLocalResourcesFunction) {
+   this.hasPartitionsOccupyingLocalResourcesFunction = 
hasPartitionsOccupyingLocalResourcesFunction;
+   }
+
+   @Override
+   public int start() throws IOException {
+   return 0;
+   }
+
+   @Override
+   public Collection 
createResultPartitionWriters(JobID jobId, String taskName, ExecutionAttemptID 
executionAttemptID, Collection 
resultPartitionDeploymentDescriptors, MetricGroup outputGroup, MetricGroup 
buffersGroup) {
+   return null;
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection partitionIds) {
+
+   }
+
+   @Override
+   public void 
releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID jobId) {
+
+   }
+
+   @Override
+   public Collection 
getPartitionsOccupyingLocalResources() {
+   return null;
+   }
+
+   @Override
+   public boolean hasPartitionsOccupyingLocalResources(JobID 
jobId) {
+   return 
hasPartitionsOccupyingLocalResourcesFunction.apply(jobId);
+   }
+
+   @Override
+   public void markJobActive(JobID jobId) {
+
+   }
+
+   @Override
+   public void 
setPartitionFailedOrFinishedListener(Consumer listener) {
+   this.listener = listener;
+   }
+
+   @Override
+   public Collection createInputGates(String taskName, 
ExecutionAttemptID executionAttemptID, PartitionProducerStateProvider 
partitionProducerStateProvider, Collection 
inputGateDeploymentDescriptors, MetricGroup parentGroup, MetricGroup 
inputGroup, MetricGroup buffersGroup) {
+   return null;
+   }
+
+   @Override
+   public boolean updatePartitionInfo(ExecutionAttemptID 
consumerID, PartitionInfo partitionInfo) throws IOException, 
InterruptedException {
+   return false;
+   }
+
+   @Override
+   public void close() throws Exception {
+
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293200573
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 ##
 @@ -276,4 +409,69 @@ private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskMa
null),
new TestingFatalErrorHandler());
}
+
+   private static class TestJobAwareShuffleEnvironment implements 
JobAwareShuffleEnvironment {
+
+   private final Function 
hasPartitionsOccupyingLocalResourcesFunction;
+   private Consumer listener = null;
+
+   private TestJobAwareShuffleEnvironment(Function 
hasPartitionsOccupyingLocalResourcesFunction) {
+   this.hasPartitionsOccupyingLocalResourcesFunction = 
hasPartitionsOccupyingLocalResourcesFunction;
+   }
+
+   @Override
+   public int start() throws IOException {
+   return 0;
+   }
+
+   @Override
+   public Collection 
createResultPartitionWriters(JobID jobId, String taskName, ExecutionAttemptID 
executionAttemptID, Collection 
resultPartitionDeploymentDescriptors, MetricGroup outputGroup, MetricGroup 
buffersGroup) {
+   return null;
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection partitionIds) {
+
+   }
+
+   @Override
+   public void 
releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID jobId) {
+
+   }
+
+   @Override
+   public Collection 
getPartitionsOccupyingLocalResources() {
+   return null;
+   }
+
+   @Override
+   public boolean hasPartitionsOccupyingLocalResources(JobID 
jobId) {
+   return 
hasPartitionsOccupyingLocalResourcesFunction.apply(jobId);
+   }
+
+   @Override
+   public void markJobActive(JobID jobId) {
+
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293200547
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 ##
 @@ -276,4 +409,69 @@ private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskMa
null),
new TestingFatalErrorHandler());
}
+
+   private static class TestJobAwareShuffleEnvironment implements 
JobAwareShuffleEnvironment {
+
+   private final Function 
hasPartitionsOccupyingLocalResourcesFunction;
+   private Consumer listener = null;
+
+   private TestJobAwareShuffleEnvironment(Function 
hasPartitionsOccupyingLocalResourcesFunction) {
+   this.hasPartitionsOccupyingLocalResourcesFunction = 
hasPartitionsOccupyingLocalResourcesFunction;
+   }
+
+   @Override
+   public int start() throws IOException {
+   return 0;
+   }
+
+   @Override
+   public Collection 
createResultPartitionWriters(JobID jobId, String taskName, ExecutionAttemptID 
executionAttemptID, Collection 
resultPartitionDeploymentDescriptors, MetricGroup outputGroup, MetricGroup 
buffersGroup) {
+   return null;
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection partitionIds) {
+
+   }
+
+   @Override
+   public void 
releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID jobId) {
+
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293200529
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 ##
 @@ -276,4 +409,69 @@ private TestingTaskExecutor 
createTestingTaskExecutor(TaskManagerServices taskMa
null),
new TestingFatalErrorHandler());
}
+
+   private static class TestJobAwareShuffleEnvironment implements 
JobAwareShuffleEnvironment {
+
+   private final Function 
hasPartitionsOccupyingLocalResourcesFunction;
+   private Consumer listener = null;
+
+   private TestJobAwareShuffleEnvironment(Function 
hasPartitionsOccupyingLocalResourcesFunction) {
+   this.hasPartitionsOccupyingLocalResourcesFunction = 
hasPartitionsOccupyingLocalResourcesFunction;
+   }
+
+   @Override
+   public int start() throws IOException {
+   return 0;
+   }
+
+   @Override
+   public Collection 
createResultPartitionWriters(JobID jobId, String taskName, ExecutionAttemptID 
executionAttemptID, Collection 
resultPartitionDeploymentDescriptors, MetricGroup outputGroup, MetricGroup 
buffersGroup) {
+   return null;
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection partitionIds) {
+
 
 Review comment:
   remove empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12824) set parallelism for stream SQL

2019-06-12 Thread XuPingyong (JIRA)
XuPingyong created FLINK-12824:
--

 Summary: set parallelism for stream SQL
 Key: FLINK-12824
 URL: https://issues.apache.org/jira/browse/FLINK-12824
 Project: Flink
  Issue Type: Task
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: XuPingyong


Parallelism setting has been developped for batch SQL,  the calculation method 
can also apply to stream SQL.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on issue #8681: [FLINK-12585][python] Align 
Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#issuecomment-501541222
 
 
   @dianfu Thanks for your comment! I have made changes according to your 
comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] 
Align Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r293198978
 
 

 ##
 File path: flink-python/pyflink/common/state_backend.py
 ##
 @@ -0,0 +1,797 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+from pyflink.util.utils import to_j_config, load_java_class
+
+__all__ = [
+'StateBackend',
+'MemoryStateBackend',
+'FsStateBackend',
+'RocksDBStateBackend',
+'CustomStateBackend',
+'PredefinedOptions']
+
+if sys.version > '3':
+xrange = range
+
+
+def _configure(configurable_state_backend, config):
+gateway = get_gateway()
+j_config = to_j_config(config)
+context_class_loader = 
gateway.jvm.Thread.currentThread().getContextClassLoader()
+return configurable_state_backend._j_state_backend.configure(j_config, 
context_class_loader)
+
+
+def _from_j_state_backend(j_state_backend):
+if j_state_backend is None:
+return None
+gateway = get_gateway()
+JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
+JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory \
+.MemoryStateBackend
+JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem \
+.FsStateBackend
+JRocksDBStateBackend = 
gateway.jvm.org.apache.flink.contrib.streaming.state \
+.RocksDBStateBackend
+j_clz = j_state_backend.getClass()
+
+if not get_java_class(JStateBackend).isAssignableFrom(j_clz):
+raise TypeError("The input %s is not an instance of StateBackend." % 
j_state_backend)
+
+if 
get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()):
+return MemoryStateBackend(j_memory_state_backend=j_state_backend)
+elif 
get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()):
+return FsStateBackend(j_fs_state_backend=j_state_backend)
+elif 
get_java_class(JRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()):
+return RocksDBStateBackend(j_rocks_db_state_backend=j_state_backend)
+else:
+return CustomStateBackend(j_state_backend)  # users' customized state 
backend
+
+
+class StateBackend(object):
+"""
+A **State Backend** defines how the state of a streaming application is 
stored and
+checkpointed. Different State Backends store their state in different 
fashions, and use
+different data structures to hold the state of a running application.
+
+For example, the :class:`MemoryStateBackend` keeps working state in the 
memory of the
+TaskManager and stores checkpoints in the memory of the JobManager. The 
backend is
+lightweight and without additional dependencies, but not highly available 
and supports only
+small state.
+
+The :class:`FsStateBackend` keeps working state in the memory of the 
TaskManager and stores
+state checkpoints in a filesystem(typically a replicated highly-available 
filesystem,
+like `HDFS `_, `Ceph `_,
+`S3 `_, `GCS 
`_,
+etc).
+
+The :class:`RocksDBStateBackend` stores working state in `RocksDB 
`_,
+and checkpoints the state by default to a filesystem (similar to the 
:class:`FsStateBackend`).
+
+**Raw Bytes Storage and Backends**
+
+The :class:`StateBackend` creates services for *raw bytes storage* and for 
*keyed state*
+and *operator state*.
+
+The *raw bytes storage* (through the 
`org.apache.flink.runtime.state.CheckpointStreamFactory`)
+is the fundamental service that simply stores bytes in a fault tolerant 
fashion. This service
+is used by the JobManager to store checkpoint and recovery metadata and is 
typically also used
+by the 

[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293198942
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/NotifyingResultPartitionWriter.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/**
+ * TODO: add javadoc.
+ */
+public class NotifyingResultPartitionWriter implements ResultPartitionWriter {
+
+   private final ResultPartitionWriter backingResultPartitionWriter;
+   private final Consumer onSetupAction;
+   private final Consumer onFailAction;
+   private final Consumer onFinishAction;
+
+   public NotifyingResultPartitionWriter(
 
 Review comment:
   could be package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293199022
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/NotifyingResultPartitionWriter.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/**
+ * TODO: add javadoc.
+ */
+public class NotifyingResultPartitionWriter implements ResultPartitionWriter {
+
+   private final ResultPartitionWriter backingResultPartitionWriter;
+   private final Consumer onSetupAction;
+   private final Consumer onFailAction;
+   private final Consumer onFinishAction;
+
+   public NotifyingResultPartitionWriter(
+   ResultPartitionWriter backingResultPartitionWriter,
+   Consumer onSetupAction,
+   Consumer onFailAction,
+   Consumer onFinishAction) {
+
+   this.backingResultPartitionWriter = 
backingResultPartitionWriter;
 
 Review comment:
   checkNotNull for the parameters


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293198855
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/NotifyingResultPartitionWriter.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/**
+ * TODO: add javadoc.
+ */
+public class NotifyingResultPartitionWriter implements ResultPartitionWriter {
 
 Review comment:
   package private class


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] 
Align Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r293198898
 
 

 ##
 File path: flink-python/pyflink/streaming/stream_execution_environment.py
 ##
 @@ -0,0 +1,437 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.common import CheckpointConfig, CheckpointingMode, 
RestartStrategies
+from pyflink.common.execution_config import ExecutionConfig
+from pyflink.common.state_backend import _from_j_state_backend
+from pyflink.common.time_characteristic import TimeCharacteristic
+from pyflink.java_gateway import get_gateway
+from pyflink.util.utils import to_j_config, load_java_class
+
+__all__ = ['StreamExecutionEnvironment']
+
+
+class StreamExecutionEnvironment(object):
+"""
+The StreamExecutionEnvironment is the context in which a streaming program 
is executed. A
+*LocalStreamEnvironment* will cause execution in the attached JVM, a
+*RemoteStreamEnvironment* will cause execution on a remote setup.
+
+The environment provides methods to control the job execution (such as 
setting the parallelism
+or the fault tolerance/checkpointing parameters) and to interact with the 
outside world (data
+access).
+"""
+
+def __init__(self, j_stream_execution_environment):
+self._j_stream_execution_environment = j_stream_execution_environment
+
+def get_config(self):
+"""
+Gets the config object.
+
+:return: The :class:`~pyflink.common.ExecutionConfig` object.
+"""
+return 
ExecutionConfig(self._j_stream_execution_environment.getConfig())
+
+def set_parallelism(self, parallelism):
+"""
+Sets the parallelism for operations executed through this environment.
+Setting a parallelism of x here will cause all operators (such as map,
+batchReduce) to run with x parallel instances. This method overrides 
the
+default parallelism for this environment. The
+*LocalStreamEnvironment* uses by default a value equal to the
+number of hardware contexts (CPU cores / threads). When executing the
+program via the command line client from a JAR file, the default degree
+of parallelism is the one configured for that setup.
+
+:param parallelism: The parallelism.
+:return: This object.
+"""
+self._j_stream_execution_environment = \
+self._j_stream_execution_environment.setParallelism(parallelism)
+return self
+
+def set_max_parallelism(self, max_parallelism):
+"""
+Sets the maximum degree of parallelism defined for the program. The 
upper limit (inclusive)
+is 32767.
+
+The maximum degree of parallelism specifies the upper limit for 
dynamic scaling. It also
+defines the number of key groups used for partitioned state.
+
+:param max_parallelism: Maximum degree of parallelism to be used for 
the program,
+with 0 < maxParallelism <= 2^15 - 1.
+:return: This object.
+"""
+self._j_stream_execution_environment = \
+
self._j_stream_execution_environment.setMaxParallelism(max_parallelism)
+return self
+
+def get_parallelism(self):
+"""
+Gets the parallelism with which operation are executed by default.
+Operations can individually override this value to use a specific
+parallelism.
+
+:return: The parallelism used by operations, unless they override that 
value.
+"""
+return self._j_stream_execution_environment.getParallelism()
+
+def get_max_parallelism(self):
+"""
+Gets the maximum degree of parallelism defined for the program.
+
+The maximum degree of parallelism specifies the upper limit for 
dynamic scaling. It also
+defines the number of key groups used for partitioned state.
+
+

[jira] [Resolved] (FLINK-11107) Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2019-06-12 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-11107.
-
Resolution: Fixed

> Avoid memory stateBackend to create arbitrary folders under HA path when no 
> checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11107) Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2019-06-12 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862703#comment-16862703
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11107:
-

Thanks for the contribution [~yunta].

Merged -
1.9.0: c2316f69e5b5624a1aaad5066c8035cd0234da92
1.8.1: ba42d86267fa3ebe8e439794149a85823e047942

> Avoid memory stateBackend to create arbitrary folders under HA path when no 
> checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293198756
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/NotifyingResultPartitionWriter.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/**
+ * TODO: add javadoc.
 
 Review comment:
   should add


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293198855
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/NotifyingResultPartitionWriter.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/**
+ * TODO: add javadoc.
+ */
+public class NotifyingResultPartitionWriter implements ResultPartitionWriter {
 
 Review comment:
   package private class


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] 
Align Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r293198559
 
 

 ##
 File path: flink-python/pyflink/common/checkpoint_config.py
 ##
 @@ -0,0 +1,307 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyflink.common.checkpointing_mode import CheckpointingMode
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['CheckpointConfig', 'ExternalizedCheckpointCleanup']
+
+
+class CheckpointConfig(object):
+"""
+Configuration that captures all checkpointing related settings.
+
+:data:`DEFAULT_MODE`:
+
+The default checkpoint mode: exactly once.
+
+:data:`DEFAULT_TIMEOUT`:
+
+The default timeout of a checkpoint attempt: 10 minutes.
+
+:data:`DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS`:
+
+The default minimum pause to be made between checkpoints: none.
+
+:data:`DEFAULT_MAX_CONCURRENT_CHECKPOINTS`:
+
+The default limit of concurrently happening checkpoints: one.
+"""
+
+DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE
+
+DEFAULT_TIMEOUT = 10 * 60 * 1000
+
+DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0
+
+DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1
+
+def __init__(self, j_checkpoint_config):
+self._j_checkpoint_config = j_checkpoint_config
+
+def is_checkpointing_enabled(self):
+"""
+Checks whether checkpointing is enabled.
+
+:return: True if checkpointing is enables, false otherwise.
+"""
+return self._j_checkpoint_config.isCheckpointingEnabled()
+
+def get_checkpointing_mode(self):
+"""
+Gets the checkpointing mode (exactly-once vs. at-least-once).
+
+:return: The :class:`CheckpointingMode`.
+"""
+return CheckpointingMode._from_j_checkpointing_mode(
+self._j_checkpoint_config.getCheckpointingMode())
+
+def set_checkpointing_mode(self, checkpointing_mode):
+"""
+Sets the checkpointing mode (exactly-once vs. at-least-once).
+
+:param checkpointing_mode: The :class:`CheckpointingMode`.
+"""
+self._j_checkpoint_config.setCheckpointingMode(
+CheckpointingMode._to_j_checkpointing_mode(checkpointing_mode))
+
+def get_checkpoint_interval(self):
+"""
+Gets the interval in which checkpoints are periodically scheduled.
+
+This setting defines the base interval. Checkpoint triggering may be 
delayed by the settings
+:func:`get_max_concurrent_checkpoints` and 
:func:`get_min_pause_between_checkpoints`.
+
+:return: The checkpoint interval, in milliseconds.
+"""
+return self._j_checkpoint_config.getCheckpointInterval()
+
+def set_checkpoint_interval(self, checkpoint_interval):
+"""
+Sets the interval in which checkpoints are periodically scheduled.
+
+This setting defines the base interval. Checkpoint triggering may be 
delayed by the settings
+:func:`set_max_concurrent_checkpoints` and 
:func:`set_min_pause_between_checkpoints`.
+
+:param checkpoint_interval: The checkpoint interval, in milliseconds.
+"""
+self._j_checkpoint_config.setCheckpointInterval(checkpoint_interval)
+
+def get_checkpoint_timeout(self):
+"""
+Gets the maximum time that a checkpoint may take before being 
discarded.
+
+:return: The checkpoint timeout, in milliseconds.
+"""
+return self._j_checkpoint_config.getCheckpointTimeout()
+
+def set_checkpoint_timeout(self, checkpoint_timeout):
+"""
+Sets the maximum time that a checkpoint may take before being 
discarded.
+
+:param checkpoint_timeout: The checkpoint timeout, in milliseconds.
+"""
+self._j_checkpoint_config.setCheckpointTimeout(checkpoint_timeout)
+
+def get_min_pause_between_checkpoints(self):
+"""
+Gets the minimal pause between checkpointing attempts. This setting 
defines how soon the

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] 
Align Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r293198379
 
 

 ##
 File path: flink-python/pyflink/batch/execution_environment.py
 ##
 @@ -0,0 +1,193 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyflink.common.execution_config import ExecutionConfig
+from pyflink.common.restart_strategy import RestartStrategies
+from pyflink.java_gateway import get_gateway
+from pyflink.util.utils import load_java_class
+
+
+class ExecutionEnvironment(object):
+"""
+The ExecutionEnvironment is the context in which a program is executed.
+
+The environment provides methods to control the job execution (such as 
setting the parallelism)
+and to interact with the outside world (data access).
+"""
+
+def __init__(self, j_execution_environment):
+self._j_execution_environment = j_execution_environment
+
 
 Review comment:
   The reason of adding StreamExecutionEnvironment/ExecutionEnvironment is for 
Table API configuration. So only the configuration related APIs is added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293198233
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -239,6 +239,7 @@ public TaskExecutor(
this.taskManagerLocation = 
taskExecutorServices.getTaskManagerLocation();
this.localStateStoresManager = 
taskExecutorServices.getTaskManagerStateStore();
this.shuffleEnvironment = 
taskExecutorServices.getShuffleEnvironment();
+   
taskExecutorServices.getShuffleEnvironment().setPartitionFailedOrFinishedListener(this::notifyFinalizedPartition);
 
 Review comment:
   nit: maybe `shuffleEnvironment.setPartitionFailedOrFinishedListener` 
directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12723) Adds a wiki page about setting up a Python Table API development environment

2019-06-12 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-12723:
---

Assignee: sunjincheng

> Adds a wiki page about setting up a Python Table API development environment
> 
>
> Key: FLINK-12723
> URL: https://issues.apache.org/jira/browse/FLINK-12723
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: sunjincheng
>Priority: Major
>
> We should add a wiki page showing how to set up a Python Table API 
> development environment to help contributors who are interested in the Python 
> Table API to join in easily.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293198233
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -239,6 +239,7 @@ public TaskExecutor(
this.taskManagerLocation = 
taskExecutorServices.getTaskManagerLocation();
this.localStateStoresManager = 
taskExecutorServices.getTaskManagerStateStore();
this.shuffleEnvironment = 
taskExecutorServices.getShuffleEnvironment();
+   
taskExecutorServices.getShuffleEnvironment().setPartitionFailedOrFinishedListener(this::notifyFinalizedPartition);
 
 Review comment:
   nit: maybe `shuffleEnvironment. setPartitionFailedOrFinishedListener` 
directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] 
Align Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r293198079
 
 

 ##
 File path: flink-python/pyflink/common/checkpointing_mode.py
 ##
 @@ -0,0 +1,105 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['CheckpointingMode']
+
+
+class CheckpointingMode(object):
+"""
+The checkpointing mode defines what consistency guarantees the system 
gives in the presence of
+failures.
+
+When checkpointing is activated, the data streams are replayed such that 
lost parts of the
+processing are repeated. For stateful operations and functions, the 
checkpointing mode defines
+whether the system draws checkpoints such that a recovery behaves as if 
the operators/functions
+see each record "exactly once" (:data:`CheckpointingMode.EXACTLY_ONCE`), 
or whether the
+checkpoints are drawn in a simpler fashion that typically encounters some 
duplicates upon
+recovery (:data:`CheckpointingMode.AT_LEAST_ONCE`)
+
+:data:`EXACTLY_ONCE`:
+
+Sets the checkpointing mode to "exactly once". This mode means that the 
system will
+checkpoint the operator and user function state in such a way that, upon 
recovery,
+every record will be reflected exactly once in the operator state.
+
+For example, if a user function counts the number of elements in a stream,
+this number will consistently be equal to the number of actual elements in 
the stream,
+regardless of failures and recovery.
+
+Note that this does not mean that each record flows through the streaming 
data flow
+only once. It means that upon recovery, the state of operators/functions 
is restored such
+that the resumed data streams pick up exactly at after the last 
modification to the state.
+
+Note that this mode does not guarantee exactly-once behavior in the 
interaction with
+external systems (only state in Flink's operators and user functions). The 
reason for that
+is that a certain level of "collaboration" is required between two systems 
to achieve
+exactly-once guarantees. However, for certain systems, connectors can be 
written that
+facilitate this collaboration.
+
+This mode sustains high throughput. Depending on the data flow graph and 
operations,
+this mode may increase the record latency, because operators need to align 
their input
+streams, in order to create a consistent snapshot point. The latency 
increase for simple
+dataflows (no repartitioning) is negligible. For simple dataflows with 
repartitioning, the
+average latency remains small, but the slowest records typically have an 
increased latency.
+
+:data:`AT_LEAST_ONCE`:
+
+Sets the checkpointing mode to "at least once". This mode means that the 
system will
+checkpoint the operator and user function state in a simpler way. Upon 
failure and recovery,
+some records may be reflected multiple times in the operator state.
+
+For example, if a user function counts the number of elements in a stream,
+this number will equal to, or larger, than the actual number of elements 
in the stream,
+in the presence of failure and recovery.
+
+This mode has minimal impact on latency and may be preferable in very-low 
latency
+scenarios, where a sustained very-low latency (such as few milliseconds) 
is needed,
+and where occasional duplicate messages (on recovery) do not matter.
+"""
+
+EXACTLY_ONCE = 0
+AT_LEAST_ONCE = 1
+
+@classmethod
+def _from_j_checkpointing_mode(cls, j_checkpointing_mode):
+gateway = get_gateway()
+JCheckpointingMode = \
+gateway.jvm.org.apache.flink.streaming.api.CheckpointingMode
+if j_checkpointing_mode == JCheckpointingMode.EXACTLY_ONCE:
+return CheckpointingMode.EXACTLY_ONCE
+elif j_checkpointing_mode == JCheckpointingMode.AT_LEAST_ONCE:
+  

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API

2019-06-12 Thread GitBox
WeiZhong94 commented on a change in pull request #8681: [FLINK-12585][python] 
Align Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r293198094
 
 

 ##
 File path: flink-python/pyflink/common/execution_config.py
 ##
 @@ -0,0 +1,720 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+from pyflink.common.execution_mode import ExecutionMode
+from pyflink.common.input_dependency_constraint import 
InputDependencyConstraint
+from pyflink.common.restart_strategy import RestartStrategies
+from pyflink.java_gateway import get_gateway
+from pyflink.util.utils import load_java_class
+
+if sys.version >= '3':
+unicode = str
+
+__all__ = ['ExecutionConfig']
+
+
+class ExecutionConfig(object):
+"""
+A config to define the behavior of the program execution. It allows to 
define (among other
+options) the following settings:
+
+- The default parallelism of the program, i.e., how many parallel tasks to 
use for
+  all functions that do not define a specific value directly.
+
+- The number of retries in the case of failed executions.
+
+- The delay between execution retries.
+
+- The :class:`ExecutionMode` of the program: Batch or Pipelined.
+  The default execution mode is :data:`ExecutionMode.PIPELINED`
+
+- Enabling or disabling the "closure cleaner". The closure cleaner 
pre-processes
+  the implementations of functions. In case they are (anonymous) inner 
classes,
+  it removes unused references to the enclosing class to fix certain 
serialization-related
+  problems and to reduce the size of the closure.
+
+- The config allows to register types and serializers to increase the 
efficiency of
+  handling *generic types* and *POJOs*. This is usually only needed
+  when the functions return not only the types declared in their 
signature, but
+  also subclasses of those types.
+
+:data:`PARALLELISM_DEFAULT`:
+
+The flag value indicating use of the default parallelism. This value can
+be used to reset the parallelism back to the default state.
+
+:data:`PARALLELISM_UNKNOWN`:
+
+The flag value indicating an unknown or unset parallelism. This value is
+not a valid parallelism and indicates that the parallelism should remain
+unchanged.
+"""
+
+PARALLELISM_DEFAULT = -1
+
+PARALLELISM_UNKNOWN = -2
+
+def __init__(self, j_execution_config):
+self._j_execution_config = j_execution_config
+
+def enable_closure_cleaner(self):
+"""
+Enables the ClosureCleaner. This analyzes user code functions and sets 
fields to null
+that are not used. This will in most cases make closures or anonymous 
inner classes
+serializable that where not serializable due to some Scala or Java 
implementation artifact.
+User code must be serializable because it needs to be sent to worker 
nodes.
+
+:return: This object.
+"""
+self._j_execution_config = 
self._j_execution_config.enableClosureCleaner()
+return self
+
+def disable_closure_cleaner(self):
+"""
+Disables the ClosureCleaner.
+
+see :func:`enable_closure_cleaner`
+
+:return: This object.
+"""
+self._j_execution_config = 
self._j_execution_config.disableClosureCleaner()
+return self
+
+def is_closure_cleaner_enabled(self):
+"""
+Returns whether the ClosureCleaner is enabled.
+
+see :func:`enable_closure_cleaner`
+
+:return: ``True`` means enable and ``False`` means disable.
+"""
+return self._j_execution_config.isClosureCleanerEnabled()
+
+def set_auto_watermark_interval(self, interval):
+"""
+Sets the interval of the automatic watermark emission. Watermarks are 
used throughout
+the streaming system to keep track of the progress of time. They are 
used, for example,
+for time based windowing.
+
+:param interval: The integer value interval between 

[GitHub] [flink] asfgit closed pull request #7281: [FLINK-11107] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2019-06-12 Thread GitBox
asfgit closed pull request #7281: [FLINK-11107] Avoid memory stateBackend to 
create arbitrary folders under HA path when no checkpoint path configured
URL: https://github.com/apache/flink/pull/7281
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
gaoyunhaii commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293197764
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java
 ##
 @@ -60,4 +61,26 @@ public void testCreateViewForRegisteredPartition() throws 
Exception {
partitionManager.registerResultPartition(partition);

partitionManager.createSubpartitionView(partition.getPartitionId(), 0, new 
NoOpBufferAvailablityListener());
}
+
+   @Test
+   public void testExternallyManagedPartitionReleaseIfFlagDisabled() {
 
 Review comment:
   I think it would be better to have a common utility method with a parameters 
indicating whether to release externally managed partitions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12720) Add the Python Table API Sphinx docs

2019-06-12 Thread Huang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-12720:


Assignee: Huang Xingbo

> Add the Python Table API Sphinx docs
> 
>
> Key: FLINK-12720
> URL: https://issues.apache.org/jira/browse/FLINK-12720
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Documentation
>Reporter: Dian Fu
>Assignee: Huang Xingbo
>Priority: Major
>
> As the Python Table API is added, we should add the Python Table API Sphinx 
> docs. This includes the following work:
> 1) Add scripts to build the Sphinx docs
> 2) Add a link in the main page to the generated doc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8717: fix issue https://issues.apache.org/jira/browse/FLINK-12725

2019-06-12 Thread GitBox
flinkbot commented on issue #8717: fix issue 
https://issues.apache.org/jira/browse/FLINK-12725
URL: https://github.com/apache/flink/pull/8717#issuecomment-501538731
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293196850
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Job-aware variation of the {@link ShuffleEnvironment} interface.
+ *
+ * @see ShuffleEnvironment
+ */
+public interface JobAwareShuffleEnvironment extends AutoCloseable {
+
+   /**
+* Start the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
 
 Review comment:
   Hmm, my previous thought was putting `JobID` into shuffle context which was 
proposed by Andrey in another PR via 
`ShuffleEnvironment#createResultPartitionWriters(context)`, then in 
`JobAwareShuffleEnvironmentImpl` it could get job id from context to avoid two 
`createResultPartitionWriters` methods separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
gaoyunhaii commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293187444
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Job-aware variation of the {@link ShuffleEnvironment} interface.
+ *
+ * @see ShuffleEnvironment
+ */
+public interface JobAwareShuffleEnvironment extends AutoCloseable {
+
+   /**
+* Start the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Factory method for the task's {@link ResultPartitionWriter}s.
+*
+* @parem jobId the job id, used for tracking which job the created 
partitions belongs to
+* @param taskName the task name, used for logs
+* @param executionAttemptID execution attempt id of the task
+* @param resultPartitionDeploymentDescriptors descriptors of the 
partition, produced by the task
+* @param outputGroup shuffle specific group for output metrics
+* @param buffersGroup shuffle specific group for buffer metrics
+* @return collection of the task's {@link ResultPartitionWriter}s
+*/
+   Collection createResultPartitionWriters(
+   JobID jobId,
+   String taskName,
+   ExecutionAttemptID executionAttemptID,
+   Collection 
resultPartitionDeploymentDescriptors,
+   MetricGroup outputGroup,
+   MetricGroup buffersGroup);
+
+   /**
+* Release local resources occupied with the given partitions.
+*
+* @param partitionIds partition ids to release
+*/
+   void releaseFinishedPartitions(JobID jobId, 
Collection partitionIds);
+
+   /**
+* Release all local resources occupied with partitions belonging to 
the given job.
+*
+* @param jobId
+*/
+   void releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID jobId);
+
+   /**
+* Report partitions which still occupy some resources locally.
 
 Review comment:
   Reports


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
gaoyunhaii commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293187378
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Job-aware variation of the {@link ShuffleEnvironment} interface.
+ *
+ * @see ShuffleEnvironment
+ */
+public interface JobAwareShuffleEnvironment extends AutoCloseable {
+
+   /**
+* Start the internal related services upon {@link TaskExecutor}'s 
startup.
+*
+* @return a port to connect to the task executor for shuffle data 
exchange, -1 if only local connection is possible.
+*/
+   int start() throws IOException;
+
+   /**
+* Factory method for the task's {@link ResultPartitionWriter}s.
+*
+* @parem jobId the job id, used for tracking which job the created 
partitions belongs to
+* @param taskName the task name, used for logs
+* @param executionAttemptID execution attempt id of the task
+* @param resultPartitionDeploymentDescriptors descriptors of the 
partition, produced by the task
+* @param outputGroup shuffle specific group for output metrics
+* @param buffersGroup shuffle specific group for buffer metrics
+* @return collection of the task's {@link ResultPartitionWriter}s
+*/
+   Collection createResultPartitionWriters(
+   JobID jobId,
+   String taskName,
+   ExecutionAttemptID executionAttemptID,
+   Collection 
resultPartitionDeploymentDescriptors,
+   MetricGroup outputGroup,
+   MetricGroup buffersGroup);
+
+   /**
+* Release local resources occupied with the given partitions.
 
 Review comment:
   Similarly, Releases


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
gaoyunhaii commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293196661
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -80,6 +80,8 @@
/** Type of this partition. Defines the concrete subpartition 
implementation to use. */
private final ResultPartitionType partitionType;
 
+   private boolean isManagedExternally;
 
 Review comment:
   For externally managed result partitions, we may also need to bypass the 
checking of pendingReference in createSubpartitionView and 
onConsumedSubpartition to support reading multiple times.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
gaoyunhaii commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293187277
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Job-aware variation of the {@link ShuffleEnvironment} interface.
+ *
+ * @see ShuffleEnvironment
+ */
+public interface JobAwareShuffleEnvironment extends AutoCloseable {
+
+   /**
+* Start the internal related services upon {@link TaskExecutor}'s 
startup.
 
 Review comment:
   Starts


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
gaoyunhaii commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293194001
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 ##
 @@ -104,6 +104,24 @@ public void testSerializationWithNettyShuffleDescriptor() 
throws Exception {
assertThat(shuffleDescriptorCopy.getConnectionId(), 
is(connectionID));
}
 
+   @Test
+   public void testManagedExternallyFlag() {
+   for (ResultPartitionType partitionType : 
ResultPartitionType.values()) {
+   ResultPartitionDeploymentDescriptor partitionDescriptor 
= new ResultPartitionDeploymentDescriptor(
+   new PartitionDescriptor(resultId, partitionId, 
partitionType, numberOfSubpartitions, connectionIndex),
+   (ShuffleDescriptor) ResultPartitionID::new,
 
 Review comment:
   It seems explicit type cast not needed here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12814) Support a traditional and scrolling view of result (non-interactive)

2019-06-12 Thread Zhenghua Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-12814:
-
Attachment: (was: image-2019-06-12-16-11-06-070.png)

> Support a traditional and scrolling view of result (non-interactive)
> 
>
> Key: FLINK-12814
> URL: https://issues.apache.org/jira/browse/FLINK-12814
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Affects Versions: 1.8.0
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In table mode, we want to introduce a non-interactive view (so-called 
> FinalizedResult), which submit SQL statements(DQLs) in attach mode with a 
> user defined timeout, fetch results until the job finished/failed/timeout or 
> interrupted by user(Ctrl+C), and output them in a non-interactive way (the 
> behavior in change-log mode is under discussion)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12725) Need to copy flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib location

2019-06-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12725:
---
Labels: pull-request-available  (was: )

> Need to copy flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib 
> location
> 
>
> Key: FLINK-12725
> URL: https://issues.apache.org/jira/browse/FLINK-12725
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Reporter: arganzheng
>Assignee: arganzheng
>Priority: Minor
>  Labels: pull-request-available
>
> I am currently working on an Flink application that uses some of the Hadoop 
> dependencies to write the data to HDFS.  On local environment it is working 
> fine, however when I deploy this Flink application on the cluster it throws 
> an exception related to compatibility issue.
>  The error message that I am getting is 
>  
> {code:java}
> java.lang.RuntimeException: Could not load the TypeInformation for the class 
> 'org.apache.hadoop.io.Writable'. You may be missing the 
> 'flink-hadoop-compatibility' dependency. at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
>  at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
>  at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
>  at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
>  
> {code}
>  I try to included the maven dependency of {{flink-hadoop-compatibility}} jar 
> in POM dependency. But it is not detecting it. The Flink version I am using 
> is 1.8.0
> However, when I explicitly copy the compatibility JAR to the 
> {{${FLINK-HOME}/lib}} location, I am not getting any exception and able to 
> run the Flink application successfully.
> I try dive into the source code, and find the problem:
> {code:java}
> package org.apache.flink.api.java.typeutils;
> public class TypeExtractor {
> /** The name of the class representing Hadoop's writable */
> private static final String HADOOP_WRITABLE_CLASS = 
> "org.apache.hadoop.io.Writable";
> private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = 
> "org.apache.flink.api.java.typeutils.WritableTypeInfo";
> // visible for testing
> public static  TypeInformation createHadoopWritableTypeInfo(Class 
> clazz) {
> checkNotNull(clazz);
> Class typeInfoClass;
> try {
> typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, 
> TypeExtractor.class.getClassLoader());
> }
> catch (ClassNotFoundException e) {
> throw new RuntimeException("Could not load the TypeInformation for the class 
> '"
> + HADOOP_WRITABLE_CLASS + "'. You may be missing the 
> 'flink-hadoop-compatibility' dependency.");
> }
> ...
> }
> }
> {code}
>  
> This is because `org.apache.hadoop.io.Writable` is mean to be loaded by 
> TypeExtractor.class.getClassLoader() which is `AppClassLoader`, and the 
> submited flink jar is loaded by `ParentFirstClassLoader`, which is the child 
> of `AppClassLoader`, so `AppClassLoader` can not load 
> `org.apache.hadoop.io.Writable` from your flink jar.
> I'm not sure if it's a bug, change to classLoader to 
> `Thread.currentThread().getContextClassLoader()` will make it work without 
> copy the flink-hadoop-compatibility jar file to ${FLINK-HOME}/lib location.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] arganzheng opened a new pull request #8717: fix issue https://issues.apache.org/jira/browse/FLINK-12725

2019-06-12 Thread GitBox
arganzheng opened a new pull request #8717: fix issue 
https://issues.apache.org/jira/browse/FLINK-12725
URL: https://github.com/apache/flink/pull/8717
 
 
   
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this ## What is the purpose of the change
   
   fix issue https://issues.apache.org/jira/browse/FLINK-12725. No need to copy 
the flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib location.
   
   ## Brief change log
   
   change the classLoader of 
`org.apache.flink.api.java.typeutils.WritableTypeInfo` from 
`TypeExtractor.class.getClassLoader()` to 
`Thread.currentThread().getContextClassLoader()`.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no/)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12814) Support a traditional and scrolling view of result (non-interactive)

2019-06-12 Thread Zhenghua Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-12814:
-
Description: In table mode, we want to introduce a non-interactive view 
(so-called FinalizedResult), which submit SQL statements(DQLs) in attach mode 
with a user defined timeout, fetch results until the job 
finished/failed/timeout or interrupted by user(Ctrl+C), and output them in a 
non-interactive way (the behavior in change-log mode is under discussion)  
(was: In table mode, we want to introduce a non-interactive view (so-called 
FinalizedResult), which submit SQL statements(DQLs) in attach mode with a user 
defined timeout, fetch results until the job finished/failed/timeout or 
interrupted by user(Ctrl+C), and output them in a non-interactive way (the 
behavior in change-log mode is under discussion)

 

!image-2019-06-12-16-11-06-070.png!)

> Support a traditional and scrolling view of result (non-interactive)
> 
>
> Key: FLINK-12814
> URL: https://issues.apache.org/jira/browse/FLINK-12814
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Affects Versions: 1.8.0
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-06-12-16-11-06-070.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In table mode, we want to introduce a non-interactive view (so-called 
> FinalizedResult), which submit SQL statements(DQLs) in attach mode with a 
> user defined timeout, fetch results until the job finished/failed/timeout or 
> interrupted by user(Ctrl+C), and output them in a non-interactive way (the 
> behavior in change-log mode is under discussion)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12717) Add windows support for the Python shell script

2019-06-12 Thread Wei Zhong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Zhong reassigned FLINK-12717:
-

Assignee: Wei Zhong

> Add windows support for the Python shell script
> ---
>
> Key: FLINK-12717
> URL: https://issues.apache.org/jira/browse/FLINK-12717
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Wei Zhong
>Priority: Major
>
> We should add a windows shell script for pyflink-gateway-server.sh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293195888
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link ShuffleEnvironment} to allow tracking of partitions per job.
+ */
+public class JobAwareShuffleEnvironmentImpl implements 
JobAwareShuffleEnvironment {
+
+   private static final Consumer NO_OP_NOTIFIER = 
partitionId -> {};
+
+   private final ShuffleEnvironment backingShuffleEnvironment;
+   private final PartitionTable inProgressPartitionTable = new 
PartitionTable();
+   private final PartitionTable finishedPartitionTable = new 
PartitionTable();
+
+   /** Tracks which jobs are still being monitored, to ensure cleanup 
in cases where tasks are finishing while
+* the jobmanager connection is being terminated. This is a concurrent 
map since it is modified by both the
+* Task (via {@link #notifyPartitionFinished(JobID, 
ResultPartitionID)}} and
+* TaskExecutor (via {@link 
#releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID)}) thread. */
+   private final Set activeJobs = ConcurrentHashMap.newKeySet();
+
+   public JobAwareShuffleEnvironmentImpl(ShuffleEnvironment 
backingShuffleEnvironment) {
+   this.backingShuffleEnvironment = 
Preconditions.checkNotNull(backingShuffleEnvironment);
+   }
+
+   @Override
+   public boolean hasPartitionsOccupyingLocalResources(JobID jobId) {
+   return inProgressPartitionTable.hasTrackedPartitions(jobId) || 
finishedPartitionTable.hasTrackedPartitions(jobId);
+   }
+
+   @Override
+   public void markJobActive(JobID jobId) {
+   activeJobs.add(jobId);
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection resultPartitionIds) {
+   finishedPartitionTable.stopTrackingPartitions(jobId, 
resultPartitionIds);
+   backingShuffleEnvironment.releasePartitions(resultPartitionIds);
+   }
+
+   @Override
+   public void releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID 
jobId) {
+   activeJobs.remove(jobId);
+   Collection finishedPartitionsForJob = 
finishedPartitionTable.stopTrackingPartitions(jobId);
+   
backingShuffleEnvironment.releasePartitions(finishedPartitionsForJob);
+   }
+
+   /**
+* This method wraps partition writers for externally managed 
partitions and introduces callbacks into the lifecycle
+* methods of the {@link ResultPartitionWriter}.
+*/
+   @Override
+   public Collection 
createResultPartitionWriters(
+   JobID jobId,
+   String taskName,
+   ExecutionAttemptID executionAttemptID,
+   Collection 

[GitHub] [flink] zjuwangg commented on issue #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-12 Thread GitBox
zjuwangg commented on issue #8703: [FLINK-12807][hive]Support Hive table 
columnstats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8703#issuecomment-501536516
 
 
   cc @bowenli86 @xuefuz @lirui-apache  to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] stevenzwu commented on issue #8665: [FLINK-12781] [Runtime/REST] include the whole stack trace in response payload

2019-06-12 Thread GitBox
stevenzwu commented on issue #8665: [FLINK-12781] [Runtime/REST] include the 
whole stack trace in response payload
URL: https://github.com/apache/flink/pull/8665#issuecomment-501535542
 
 
   latest travis build seems to fail with build timeout


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12823) PartitionTransformation supports DataExchangeMode property

2019-06-12 Thread Jingsong Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862690#comment-16862690
 ] 

Jingsong Lee commented on FLINK-12823:
--

(y)(y)(y) The basic requirement is that batch jobs need to be set up with 
BLOCKING ResultPartitionType to avoid diamond-shaped DAG deadlocks.

NIT: I think StreamTransformation are more accurate than DataStream.

> PartitionTransformation supports DataExchangeMode property
> --
>
> Key: FLINK-12823
> URL: https://issues.apache.org/jira/browse/FLINK-12823
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently {{ResultPartitionType}} of {{DataStream}} job is hard-coded with 
> {{PIPELINED_BOUNDED}}. Since {{DataStream}} would support batch mode, there 
> should be way to set the {{ResultPartitionType}}.
> So I propose introducing a new property {{DataExchangeMode}} of 
> {{PartitionTransformation}}. As a first step, this would be an internal API. 
> Maybe it would be exposed as a public API in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293193273
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link ShuffleEnvironment} to allow tracking of partitions per job.
+ */
+public class JobAwareShuffleEnvironmentImpl implements 
JobAwareShuffleEnvironment {
+
+   private static final Consumer NO_OP_NOTIFIER = 
partitionId -> {};
+
+   private final ShuffleEnvironment backingShuffleEnvironment;
+   private final PartitionTable inProgressPartitionTable = new 
PartitionTable();
+   private final PartitionTable finishedPartitionTable = new 
PartitionTable();
 
 Review comment:
   I got it, thanks for this explanation.
   I think it is better to give some annotations here for this consideration of 
releasing finished partitions in order to make other guys clear while seeing 
the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2019-06-12 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862673#comment-16862673
 ] 

Yun Tang commented on FLINK-8871:
-

[~pnowojski] I noticed that you are also refactoring the checkpoint barrier 
handler in [FLINK-11875|https://issues.apache.org/jira/browse/FLINK-11875], and 
meet conflict when I created the PR. Would you please help review the design 
doc and related PR to comment whether my work would conflict with yours design, 
any suggestion is welcome. CC [~sunhaibotb]

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8687: [FLINK-12612][coordination] Track stored partition on the TaskExecutor

2019-06-12 Thread GitBox
zhijiangW commented on a change in pull request #8687: 
[FLINK-12612][coordination] Track stored partition on the TaskExecutor 
URL: https://github.com/apache/flink/pull/8687#discussion_r293189418
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
 ##
 @@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link ShuffleEnvironment} to allow tracking of partitions per job.
+ */
+public class JobAwareShuffleEnvironmentImpl implements 
JobAwareShuffleEnvironment {
+
+   private final ShuffleEnvironment backingShuffleEnvironment;
+
+   private final PartitionTable inProgressPartitionTable = new 
PartitionTable();
+   private final PartitionTable finishedPartitionTable = new 
PartitionTable();
+
+   private Consumer listener = jobId -> {};
+
+   /** Tracks which jobs are still being monitored, to ensure cleanup 
in cases where tasks are finishing while
+* the jobmanager connection is being terminated. This is a concurrent 
map since it is modified by both the
+* Task (via {@link #notifyPartitionFinished(JobID, 
ResultPartitionID)}} and
+* TaskExecutor (via {@link 
#releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID)}) thread. */
+   private final Set activeJobs = ConcurrentHashMap.newKeySet();
+
+   public JobAwareShuffleEnvironmentImpl(ShuffleEnvironment 
backingShuffleEnvironment) {
+   this.backingShuffleEnvironment = 
Preconditions.checkNotNull(backingShuffleEnvironment);
+   }
+
+   @Override
+   public void setPartitionFailedOrFinishedListener(Consumer 
listener) {
+   this.listener = listener;
+   }
+
+   @Override
+   public boolean hasPartitionsOccupyingLocalResources(JobID jobId) {
+   return inProgressPartitionTable.hasTrackedPartitions(jobId) || 
finishedPartitionTable.hasTrackedPartitions(jobId);
+   }
+
+   @Override
+   public void markJobActive(JobID jobId) {
+   activeJobs.add(jobId);
+   }
+
+   @Override
+   public void releaseFinishedPartitions(JobID jobId, 
Collection resultPartitionIds) {
+   finishedPartitionTable.stopTrackingPartitions(jobId, 
resultPartitionIds);
+   backingShuffleEnvironment.releasePartitions(resultPartitionIds);
+   }
+
+   @Override
+   public void releaseAllFinishedPartitionsForJobAndMarkJobInactive(JobID 
jobId) {
+   activeJobs.remove(jobId);
+   Collection finishedPartitionsForJob = 
finishedPartitionTable.stopTrackingPartitions(jobId);
+   
backingShuffleEnvironment.releasePartitions(finishedPartitionsForJob);
+   }
+
+   /**
+* This method wraps partition writers for externally managed 
partitions and introduces callbacks into the lifecycle
+* methods of the {@link ResultPartitionWriter}.
+*/
+   @Override
+   public Collection createResultPartitionWriters(
+   JobID jobId,
+   String 

[jira] [Created] (FLINK-12823) PartitionTransformation supports DataExchangeMode property

2019-06-12 Thread Biao Liu (JIRA)
Biao Liu created FLINK-12823:


 Summary: PartitionTransformation supports DataExchangeMode property
 Key: FLINK-12823
 URL: https://issues.apache.org/jira/browse/FLINK-12823
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
Assignee: Biao Liu
 Fix For: 1.9.0


Currently {{ResultPartitionType}} of {{DataStream}} job is hard-coded with 
{{PIPELINED_BOUNDED}}. Since {{DataStream}} would support batch mode, there 
should be way to set the {{ResultPartitionType}}.
So I propose introducing a new property {{DataExchangeMode}} of 
{{PartitionTransformation}}. As a first step, this would be an internal API. 
Maybe it would be exposed as a public API in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe edited a comment on issue #8707: [FLINK-12815] [table-planner-blink] Supports CatalogManager in blink planner

2019-06-12 Thread GitBox
godfreyhe edited a comment on issue #8707: [FLINK-12815] [table-planner-blink] 
Supports CatalogManager in blink planner
URL: https://github.com/apache/flink/pull/8707#issuecomment-501207481
 
 
   thanks for your suggestion @dawidwys , i will try to split this PR into 5 
commits as following:
   1. Unify BatchTableSourceTable and StreamTableSourceTable into 
TableSourceTable
   2. Do not register the intermediate optimization result as a table in batch 
and stream CommonSubGraphBasedOptimizer
   3. Introduce CrudExternalCatalog and related catalog exceptions
   4. TableImpl supports QueryOperation in blink planner
   5. Supports CatalogManager in blink planner
   
   however the last commit is also very enormous due to there are 2100+ plan 
test cases, and the result of each plan test case will be added catalog name 
and database name on table scan node, and the number of related change lines is 
7300+ .
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12822) Add explicit transformer from SerializableOptional to Optional

2019-06-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-12822:
-
Attachment: FLINK-12822.patch

> Add explicit transformer from SerializableOptional to Optional
> --
>
> Key: FLINK-12822
> URL: https://issues.apache.org/jira/browse/FLINK-12822
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: FLINK-12822.patch
>
>
> We introduce {{SerializableOptional}} to represent return value that of 
> {{Optional}} and transported cross network.
> The purpose is reasonable. However, a wart is inside 
> {{SerializableOptional}}. Calling {{SerializableOptional#map}} will returns a 
> {{Optional}}, which might surprise contributors if they want to chain 
> operations and get a {{SerializableOptional}} as final result(to another 
> transport maybe).
> Semantically return value of {{SerializableOptional#map}} should be 
> {{SerializableOptional}}, and for interoperation, we could introduce a 
> {{SerializableOptional#toOptional}} which easily adapt to the correct type.
> cc the original author [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12822) Add explicit transformer from SerializableOptional to Optional

2019-06-12 Thread TisonKun (JIRA)
TisonKun created FLINK-12822:


 Summary: Add explicit transformer from SerializableOptional to 
Optional
 Key: FLINK-12822
 URL: https://issues.apache.org/jira/browse/FLINK-12822
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Affects Versions: 1.9.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.9.0


We introduce {{SerializableOptional}} to represent return value that of 
{{Optional}} and transported cross network.

The purpose is reasonable. However, a wart is inside {{SerializableOptional}}. 
Calling {{SerializableOptional#map}} will returns a {{Optional}}, which might 
surprise contributors if they want to chain operations and get a 
{{SerializableOptional}} as final result(to another transport maybe).

Semantically return value of {{SerializableOptional#map}} should be 
{{SerializableOptional}}, and for interoperation, we could introduce a 
{{SerializableOptional#toOptional}} which easily adapt to the correct type.

cc the original author [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8690: [FLINK-12801][table-planner-blink] set parallelism for batch SQL

2019-06-12 Thread GitBox
asfgit closed pull request #8690: [FLINK-12801][table-planner-blink] set 
parallelism for batch SQL
URL: https://github.com/apache/flink/pull/8690
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12801) Set parallelism for batch SQL

2019-06-12 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12801.
---
   Resolution: Fixed
 Assignee: xupingyong
Fix Version/s: 1.9.0

Fixed in 1.9.0: 6f5425fc9798510fef33ccd7bb81d4b9f59bffa5

> Set parallelism for batch SQL
> -
>
> Key: FLINK-12801
> URL: https://issues.apache.org/jira/browse/FLINK-12801
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: XuPingyong
>Assignee: xupingyong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>        DataStream user can set parallelism by 
> SingleOutputStreamOperator#setParallelism and DataStreamSink#setParallelism. 
> But SQL users cannot set parallelism  to operators while compiled jobGraphs 
> from SQL are usally complex.
>        Now we first set parallelism for batch SQL by config. We introduce two 
> resourceSetting mode:
>        InferMode.NONE:  User can set parallelism to source, sink and other 
> nodes separately.
>        InferMode.ONLY_SOURCE: Relative to  InferMode.NONE, source paralelism 
> can be inferred by source row count.
>         We also introduce ShuffleStage to make adjacent operatos parallelism 
> same that there is no data shuffle between them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293182686
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293182526
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Util for any ObjectInspector related inspection and conversion of Hive data 
to/from Flink data.
+ *
+ * Hive ObjectInspector is a group of flexible APIs to inspect value in 
different data representation,
+ * and developers can extend those API as needed, so technically, object 
inspector supports arbitrary data type in java.
+ */
+@Internal
+public class HiveInspectors {
+
+   /**
+* Get conversion for converting Flink object to Hive object from an 
ObjectInspector.
+*/
+   public static HiveObjectConversion getConversion(ObjectInspector 
inspector) {
+   if (inspector instanceof PrimitiveObjectInspector) {
+   if (inspector instanceof JavaBooleanObjectInspector) {
+   if (((JavaBooleanObjectInspector) 
inspector).preferWritable()) {
+   return o -> new 
BooleanWritable((Boolean) o);
+   } else {
+   return IdentityConversion.INSTANCE;
+   }
+   } else if (inspector instanceof 

[GitHub] [flink] JingsongLi commented on issue #8690: [FLINK-12801][table-planner-blink] set parallelism for batch SQL

2019-06-12 Thread GitBox
JingsongLi commented on issue #8690: [FLINK-12801][table-planner-blink] set 
parallelism for batch SQL
URL: https://github.com/apache/flink/pull/8690#issuecomment-501520352
 
 
   LGTM +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293181133
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.functions.hive.conversion.IdentityConversion;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ *  A ScalarFunction implementation that calls Hive's {@link UDF}.
+ */
+@Internal
+public class HiveSimpleUDF extends HiveScalarFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveSimpleUDF.class);
+
+   private transient Method method;
+   private transient GenericUDFUtils.ConversionHelper conversionHelper;
+   private transient HiveObjectConversion[] conversions;
+   private transient boolean allIdentityConverter;
+
+   public HiveSimpleUDF(HiveFunctionWrapper hiveFunctionWrapper) {
+   super(hiveFunctionWrapper);
+   LOG.info("Creating HiveSimpleUDF from '{}'", 
this.hiveFunctionWrapper.getClassName());
+   }
+
+   @Override
+   public void openInternal() {
+   LOG.info("Opening HiveSimpleUDF as '{}'", 
hiveFunctionWrapper.getClassName());
+
+   function = hiveFunctionWrapper.createFunction();
+
+   List typeInfos = new ArrayList<>();
+
+   for (DataType arg : argTypes) {
+   typeInfos.add(HiveTypeUtil.toHiveTypeInfo(arg));
+   }
+
+   try {
+   method = 
function.getResolver().getEvalMethod(typeInfos);
+   returnInspector = 
ObjectInspectorFactory.getReflectionObjectInspector(method.getGenericReturnType(),
+   
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+   ObjectInspector[] argInspectors = new 
ObjectInspector[typeInfos.size()];
+
+   for (int i = 0; i < argTypes.length; i++) {
+   argInspectors[i] = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfos.get(i));
+   }
+
+   conversionHelper = new 
GenericUDFUtils.ConversionHelper(method, argInspectors);
+   conversions = new 
HiveObjectConversion[argInspectors.length];
+   for (int i = 0; i < argInspectors.length; i++) {
+   conversions[i] = 
HiveInspectors.getConversion(argInspectors[i]);
+   }
+
+   allIdentityConverter = Arrays.stream(conversions)
+   .allMatch(conv -> conv instanceof 
IdentityConversion);
+   } catch (Exception e) {
+   throw new FlinkHiveUDFException(
+   String.format("Failed to open HiveSimpleUDF 
from %s", hiveFunctionWrapper.getClassName()), e);
+   }
+   }
+
+   @Override
+

[GitHub] [flink] tzulitai commented on issue #7281: [FLINK-11107] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2019-06-12 Thread GitBox
tzulitai commented on issue #7281: [FLINK-11107] Avoid memory stateBackend to 
create arbitrary folders under HA path when no checkpoint path configured
URL: https://github.com/apache/flink/pull/7281#issuecomment-501519348
 
 
   thanks for the review @aljoscha and @Myasuka for the contribution.
   
   Merging this for 1.9.0 and 1.8.1 ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
JingsongLi commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293180513
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+
+/**
+ * Abstract class to provide more information for Hive {@link UDF} and {@link 
GenericUDF} functions.
+ */
+@Internal
+public abstract class HiveScalarFunction extends ScalarFunction 
implements HiveFunction {
+
+   protected final HiveFunctionWrapper hiveFunctionWrapper;
+
+   protected Object[] constantArguments;
+   protected DataType[] argTypes;
+
+   protected transient UDFType function;
+   protected transient ObjectInspector returnInspector;
+
+   private transient boolean isArgsSingleArray;
+
+   HiveScalarFunction(HiveFunctionWrapper hiveFunctionWrapper) {
+   this.hiveFunctionWrapper = hiveFunctionWrapper;
+   }
+
+   @Override
+   public void setArgumentTypesAndConstants(Object[] constantArguments, 
DataType[] argTypes) {
+   this.constantArguments = constantArguments;
+   this.argTypes = argTypes;
+   }
+
+   @Override
+   public boolean isDeterministic() {
+   try {
+   org.apache.hadoop.hive.ql.udf.UDFType udfType =
+   hiveFunctionWrapper.getUDFClass()
+   
.getAnnotation(org.apache.hadoop.hive.ql.udf.UDFType.class);
+
+   return udfType != null && udfType.deterministic() && 
!udfType.stateful();
+   } catch (ClassNotFoundException e) {
+   throw new FlinkHiveUDFException(e);
+   }
+   }
+
+   @Override
+   public TypeInformation getResultType(Class[] signature) {
+   return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+   getHiveResultType(this.constantArguments, 
this.argTypes));
+   }
+
+   @Override
+   public void open(FunctionContext context) {
+   openInternal();
+   isArgsSingleArray = argTypes.length == 1 && (argTypes[0] 
instanceof CollectionDataType);
 
 Review comment:
   Only Object[] is called `isArgsSingleArray`.
   Not include primitiveArray, multiset.
   I think we should make more judgments and add tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-12 Thread GitBox
sunjincheng121 commented on issue #8532: [FLINK-12541][REST] Support to submit 
Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#issuecomment-501518337
 
 
   @tillrohrmann this feature not the crucial feature for Flink 1.9,  it's not 
must-have for Flink 1.9. but I also suggest adding this change to Flink 1.9, 
due to the following reasons:
   
   -  API refactoring: this PR is pretty big like you said, but most of the 
change is refactoring, i.e. 
   the change is: `jars` -> `artifacts`.  And I think if we ensure that the 
change of `jars` -> `artifacts` is the right thing, I suggest do this change in 
Flink 1.9.  :) 
   - REST API for python: I think you are right, without REST API we can using 
the client to submit the Python Job, but we only need to do a small change for 
the support this feature(only 
   300+ line change). I suggest adding this to  Flink 1.9.  :)  But I agree 
with you that this is nice to have, not must-have. 
   
   
![image](https://user-images.githubusercontent.com/22488084/59397887-c7fd1500-8dc0-11e9-8a4c-1033a7b7a163.png)
   
   What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern

2019-06-12 Thread GitBox
flinkbot edited a comment on issue #8715: [FLINK-12821][table][cep] Fix the bug 
that fix time quantifier can not be the last element of a pattern
URL: https://github.com/apache/flink/pull/8715#issuecomment-501261698
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @dawidwys [committer]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 removed a comment on issue #8623: [FLINK-12719][python] Add the Python catalog API

2019-06-12 Thread GitBox
sunjincheng121 removed a comment on issue #8623: [FLINK-12719][python] Add the 
Python catalog API
URL: https://github.com/apache/flink/pull/8623#issuecomment-501509793
 
 
   I restarted the CI...I'll have another look at this PR after CI change 
green. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern

2019-06-12 Thread GitBox
dianfu commented on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix 
time quantifier can not be the last element of a pattern
URL: https://github.com/apache/flink/pull/8715#issuecomment-501509825
 
 
   @flinkbot attention @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API

2019-06-12 Thread GitBox
sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python 
catalog API
URL: https://github.com/apache/flink/pull/8623#issuecomment-501509793
 
 
   I restarted the CI...I'll have another look at this PR after CI change 
green. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu edited a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script

2019-06-12 Thread GitBox
dianfu edited a comment on issue #8609: [FLINK-12541][container][python] Add 
support for Python jobs in build script
URL: https://github.com/apache/flink/pull/8609#issuecomment-50131
 
 
   @tillrohrmann Thanks a lot for your comments. I have updated the PR and 
addressed one of your concerns. Regarding to FLINK_JOB, this option has been 
removed and the documentation has not been updated. Just correct the 
documentation here. Feel free to let me know if you feel that change doesn't 
make sense or you prefer to address that in another PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12803) Correct the package name for python API

2019-06-12 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12803:

Description: 
Currently the package name of flink APIs should cantians the language name, 
such as:
 * flink-java -> org.apache.flink.api.java
 * flink-scala -> org.apache.flink.api.scala

So I think we should follow the pattern of API package name and correct the 
current python API package name for `flink-python`. But for long-term goal, the 
flink API package name should be:

 * org.apache.flink.api.common.
 * org.apache.flink.api.common.python.
 * org.apache.flink.api.datastream.
 * org.apache.flink.api.datastream.scala.
 * org.apache.flink.api.datastream.python.
 * org.apache.flink.api.table.
 * org.apache.flink.api.table.scala.
 * org.apache.flink.api.table.python.

 So, in this JIRA, we should correct the package name from 
`org.apache.flink.python` ---> `org.apache.flink.api.table.python`

What do you think?

  was:
Currently the package name of flink APIs should cantians the language name, 
such as:
 * flink-java -> org.apache.flink.api.java
 * flink-scala -> org.apache.flink.api.scala

So I think we should follow the pattern of API package name and correct the 
current python API package name for `flink-python`. But for long-term goal, the 
flink API package name should be:
 * org.apache.flink.api.common.
 * org.apache.flink.api.common.python.
 * org.apache.flink.api.datastream.
 * org.apache.flink.api.datastream.scala.
 * org.apache.flink.api.datastream.python.
 * org.apache.flink.api.table.
 * org.apache.flink.api.table.scala.
 * org.apache.flink.api.table.python.

 So, in this JIRA, we should correct the package name from 
`org.apache.flink.python` ---> `org.apache.flink.api.table.python`

What do you think?


> Correct the package name for python API
> ---
>
> Key: FLINK-12803
> URL: https://issues.apache.org/jira/browse/FLINK-12803
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the package name of flink APIs should cantians the language name, 
> such as:
>  * flink-java -> org.apache.flink.api.java
>  * flink-scala -> org.apache.flink.api.scala
> So I think we should follow the pattern of API package name and correct the 
> current python API package name for `flink-python`. But for long-term goal, 
> the flink API package name should be:
>  * org.apache.flink.api.common.
>  * org.apache.flink.api.common.python.
>  * org.apache.flink.api.datastream.
>  * org.apache.flink.api.datastream.scala.
>  * org.apache.flink.api.datastream.python.
>  * org.apache.flink.api.table.
>  * org.apache.flink.api.table.scala.
>  * org.apache.flink.api.table.python.
>  So, in this JIRA, we should correct the package name from 
> `org.apache.flink.python` ---> `org.apache.flink.api.table.python`
> What do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12803) Correct the package name for python API

2019-06-12 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12803:

Description: 
Currently the package name of flink APIs should cantians the language name, 
such as:
 * flink-java -> org.apache.flink.api.java
 * flink-scala -> org.apache.flink.api.scala

So I think we should follow the pattern of API package name and correct the 
current python API package name for `flink-python`. But for long-term goal, the 
flink API package name should be:
 * org.apache.flink.api.common.
 * org.apache.flink.api.common.python.
 * org.apache.flink.api.datastream.
 * org.apache.flink.api.datastream.scala.
 * org.apache.flink.api.datastream.python.
 * org.apache.flink.api.table.
 * org.apache.flink.api.table.scala.
 * org.apache.flink.api.table.python.

 So, in this JIRA, we should correct the package name from 
`org.apache.flink.python` ---> `org.apache.flink.api.table.python`

What do you think?

  was:
Currently the package name of flink APIs should cantians the language name, 
such as:
 * flink-java -> org.apache.flink.api.java
 * flink-scala -> org.apache.flink.api.scala

So I think we should follow the pattern of API package name and correct the 
current python API package name for `flink-python`, i.e.,
 * flink-python -> `org.apache.flink.python` ---> 
`org.apache.flink.api.python`

What do you think?


> Correct the package name for python API
> ---
>
> Key: FLINK-12803
> URL: https://issues.apache.org/jira/browse/FLINK-12803
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the package name of flink APIs should cantians the language name, 
> such as:
>  * flink-java -> org.apache.flink.api.java
>  * flink-scala -> org.apache.flink.api.scala
> So I think we should follow the pattern of API package name and correct the 
> current python API package name for `flink-python`. But for long-term goal, 
> the flink API package name should be:
>  * org.apache.flink.api.common.
>  * org.apache.flink.api.common.python.
>  * org.apache.flink.api.datastream.
>  * org.apache.flink.api.datastream.scala.
>  * org.apache.flink.api.datastream.python.
>  * org.apache.flink.api.table.
>  * org.apache.flink.api.table.scala.
>  * org.apache.flink.api.table.python.
>  So, in this JIRA, we should correct the package name from 
> `org.apache.flink.python` ---> `org.apache.flink.api.table.python`
> What do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-06-12 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r293171972
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
+
+   private final int tolerableCpFailureNumber;
+   private final FailJobCallback failureCallback;
+   private final AtomicInteger continuousFailureCounter;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber, 
FailJobCallback failureCallback) {
+   checkArgument(tolerableCpFailureNumber >= 0
+   && tolerableCpFailureNumber <= 
UNLIMITED_TOLERABLE_FAILURE_NUMBER,
+   "The tolerable checkpoint failure number is illegal, " +
+   "it must be greater than or equal to 0 and less 
than or equal to " + UNLIMITED_TOLERABLE_FAILURE_NUMBER + ".");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   this.failureCallback = checkNotNull(failureCallback);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+* checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+* happens before the checkpoint id generation. In 
this case, it will be specified a negative
+*  latest generated checkpoint id as a special 
flag.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
 
 Review comment:
   I have added judge logic when counting, considering the failure reason of 
special case(negative checkpoint ids) are all ignored, it's OK we do not need 
to take care of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-12 Thread GitBox
xuefuz commented on a change in pull request #8700: [FLINK-12657][hive] 
Integrate Flink with Hive UDF
URL: https://github.com/apache/flink/pull/8700#discussion_r293170708
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.functions.hive.conversion.IdentityConversion;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ *  A ScalarFunction implementation that calls Hive's {@link UDF}.
+ */
+@Internal
+public class HiveSimpleUDF extends HiveScalarFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveSimpleUDF.class);
+
+   private transient Method method;
+   private transient GenericUDFUtils.ConversionHelper conversionHelper;
+   private transient HiveObjectConversion[] conversions;
+   private transient boolean allIdentityConverter;
+
+   public HiveSimpleUDF(HiveFunctionWrapper hiveFunctionWrapper) {
+   super(hiveFunctionWrapper);
+   LOG.info("Creating HiveSimpleUDF from '{}'", 
this.hiveFunctionWrapper.getClassName());
+   }
+
+   @Override
+   public void openInternal() {
+   LOG.info("Opening HiveSimpleUDF as '{}'", 
hiveFunctionWrapper.getClassName());
+
+   function = hiveFunctionWrapper.createFunction();
+
+   List typeInfos = new ArrayList<>();
+
+   for (DataType arg : argTypes) {
+   typeInfos.add(HiveTypeUtil.toHiveTypeInfo(arg));
+   }
+
+   try {
+   method = 
function.getResolver().getEvalMethod(typeInfos);
+   returnInspector = 
ObjectInspectorFactory.getReflectionObjectInspector(method.getGenericReturnType(),
+   
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+   ObjectInspector[] argInspectors = new 
ObjectInspector[typeInfos.size()];
+
+   for (int i = 0; i < argTypes.length; i++) {
+   argInspectors[i] = 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfos.get(i));
+   }
+
+   conversionHelper = new 
GenericUDFUtils.ConversionHelper(method, argInspectors);
+   conversions = new 
HiveObjectConversion[argInspectors.length];
+   for (int i = 0; i < argInspectors.length; i++) {
+   conversions[i] = 
HiveInspectors.getConversion(argInspectors[i]);
+   }
+
+   allIdentityConverter = Arrays.stream(conversions)
+   .allMatch(conv -> conv instanceof 
IdentityConversion);
+   } catch (Exception e) {
+   throw new FlinkHiveUDFException(
+   String.format("Failed to open HiveSimpleUDF 
from %s", hiveFunctionWrapper.getClassName()), e);
+   }
+   }
+
+   @Override
+   

  1   2   3   4   5   >