[jira] [Commented] (FLINK-10143) date_format throws CodeGenException

2018-12-12 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719879#comment-16719879
 ] 

vinoyang commented on FLINK-10143:
--

Hi [~twalthr]  We also encountered this problem and then found this issue. I 
want to know if we can improve its priority. Or temporarily remove this 
function from the official documentation or give a description of the problem. 
Otherwise it will cause trouble to the user.

> date_format throws CodeGenException
> ---
>
> Key: FLINK-10143
> URL: https://issues.apache.org/jira/browse/FLINK-10143
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-08-14-18-11-40-144.png
>
>
> "insert into mtmp_sink SELECT DATE_FORMAT(time_str2tms,'%Y%d%m')  FROM 
> mtmp_source";
> We have recently used the date_format method like the above sql, but found 
> that this method does not work.
> !image-2018-08-14-18-11-40-144.png!
> error msg is :
> org.apache.flink.table.codegen.CodeGenException: Incompatible types of 
> expression and result type. Expression[GeneratedExpression(result$6,isNull$7,
> java.lang.String result$4 = "%Y, %d %M";
> boolean isNull$7 = isNull$3 || false;
> java.lang.String result$6 = "";
> if (!isNull$7) {
>  
>  result$6 = dateFormatter$5.print(result$2);
>  
> isNull$7 = (result$6 == null);
>  
> }
> ,String,false)] type is [String], result type is [Timestamp]
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:380)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:378)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:378)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:336)
>  at 
> org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:45)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:999)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:926)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:372)
>  at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
>  at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
>  at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241286479
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.dataview._
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
+import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.utils.RetractableCollector
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.table.codegen.TableAggregationCodeGenerator._
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * A base code generator for generating [[GeneratedTableAggregations]].
+  *
+  * @param config configuration that determines runtime 
behavior
+  * @param nullableInput  input(s) can be null.
+  * @param input  type information about the input of the 
Function
+  * @param constants  constant expressions that act like a second 
input in the
+  *   parameter indices.
+  * @param name   Class name of the function.
+  *   Does not need to be unique but has to be a 
valid Java class
+  *   identifier.
+  * @param physicalInputTypes Physical input row types
+  * @param tableAggOutputTypesOutput types of TableAggregateFunction
+  * @param outputSchema   The type of the rows emitted by 
TableAggregate operator
+  * @param aggregates All aggregate functions
+  * @param aggFields  Indexes of the input fields for all 
aggregate functions
+  * @param aggMapping The mapping of aggregates to output fields
+  * @param isDistinctAggs The flag array indicating whether it is 
distinct aggregate.
+  * @param isStateBackedDataViews a flag to indicate if distinct filter uses 
state backend.
+  * @param partialResults A flag defining whether final or partial 
results (accumulators)
+  *   are set
+  *   to the output row.
+  * @param fwdMapping The mapping of input fields to output fields
+  * @param mergeMapping   An optional mapping to specify the 
accumulators to merge. If not
+  *   set, we
+  *   assume that both rows have the accumulators 
at the same position.
+  * @param outputArityThe number of fields in the output row.
+  * @param needRetracta flag to indicate if the aggregate needs 
the retract method
+  * @param needEmitWithRetracta flag to indicate if the aggregate needs to 
output retractions
+  *   when update
+  * @param needMerge  a flag to indicate if the aggregate needs 
the merge method
+  * @param needReset  a flag to indicate if the aggregate needs 
the resetAccumulator
+  *   method
+  * @param accConfig  Data view specification for accumulators
+  */
+class TableAggregationCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+constants: Option[Seq[RexLiteral]],
+name: String,
+physicalInputTypes: Seq[TypeInformation[_]],
+tableAggOutputTypes: TypeInformation[_],
+outputSchema: RowSchema,
+aggregates: Array[UserDefinedAggregateFunction[_ <: Any, _ <: Any]],
+aggFields: Array[Array[Int]],
+aggMapping: Array[Int],
+isDistinctAggs: Array[Boolean],
+isStateBackedDataViews: Boolean,
+partialResults: Boolean,
+fwdMapping: Array[Int]

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241279604
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -550,6 +550,82 @@ val result = orders.distinct()
 
 {% top %}
 
+### TableAggregations
+
+
+
+
+
+  
+
+  Operators
+  Description
+
+  
+  
+
+  
+GroupBy TableAggregation
+Streaming
+Result Updating
+  
+  
+Similar to a GroupBy Aggregation. Groups the rows on the 
grouping keys with a following running table aggregation operator to aggregate 
rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support * and aggregate functions.
 
 Review comment:
   IMO, the select statement can support *


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241281902
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - emitValue or emitValueWithRetract.
+ *
+ * There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator.
+ *
+ * All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} are defined in
+ * the {@link TableAggregateFunction} functions, while other methods are 
explained below.
+ *
+ * 
+ * {@code
+ * Processes the input values and update the provided accumulator instance. 
The method
+ * accumulate can be overloaded with different custom types and arguments. A 
TableAggregateFunction
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design 
assumes the
+ * inputs are the values that have been previously accumulated. The method 
retract can be
+ * overloaded with different custom types and arguments. This function must be 
implemented for
+ * data stream bounded OVER aggregates.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void retract(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Merges a group of accumulator instances into one accumulator instance. This 
function must be
+ * implemented for data stream session window grouping aggregates and data set 
grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate 
results. It should
+ *be noted that the accumulator may contain the previous 
aggregated
+ *results. Therefore user should not replace or clean this 
instance in the
+ *custom merge method.
+ * param: its an java.lang.Iterable pointed to a group of accumulators 
that will be
+ *merged.
+ *
+ * public void merge(ACC accumulator, java.lang.Iterable iterable)
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Resets the accumulator for this AggregateFunction. This function must be 
implemented for
+ * data set grouping aggregates.
+ *
+ * param: accumulator the accumulator which needs to be reset
+ *
+ * public void resetAccumulator(ACC accumulator)
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Output data incrementally in upsert or append mode. For example, if we emit 
data for a TopN
+ * TableAggregateFunction, we don't have to output all top N elements each 
time a record comes.
+ * It is more efficient to output data incrementally in upsert mode, i.e, only 
output data whose
+ * rank has been changed.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: out   the collector used to output data.
+ *
+ * public void emitValue(ACC accumulator, RetractableCollector out)
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Output data incrementally in retract mode. Once there is an update, we have 
to retract old
+ * records before send new updated ones.
 
 Review comment:
   send new updat

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241293061
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableAggregateTest.scala
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func0
+import org.apache.flink.table.utils.{EmptyTableAggFunc, TableTestBase, TopN}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class TableAggregateTest extends TableTestBase {
+
 
 Review comment:
   Add an alias test


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241280067
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
 
 Review comment:
   an -> a


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241281516
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - emitValue or emitValueWithRetract.
+ *
+ * There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator.
+ *
+ * All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} are defined in
+ * the {@link TableAggregateFunction} functions, while other methods are 
explained below.
+ *
+ * 
+ * {@code
+ * Processes the input values and update the provided accumulator instance. 
The method
+ * accumulate can be overloaded with different custom types and arguments. A 
TableAggregateFunction
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design 
assumes the
+ * inputs are the values that have been previously accumulated. The method 
retract can be
+ * overloaded with different custom types and arguments. This function must be 
implemented for
+ * data stream bounded OVER aggregates.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void retract(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Merges a group of accumulator instances into one accumulator instance. This 
function must be
+ * implemented for data stream session window grouping aggregates and data set 
grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate 
results. It should
+ *be noted that the accumulator may contain the previous 
aggregated
+ *results. Therefore user should not replace or clean this 
instance in the
+ *custom merge method.
+ * param: its an java.lang.Iterable pointed to a group of accumulators 
that will be
+ *merged.
+ *
+ * public void merge(ACC accumulator, java.lang.Iterable iterable)
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Resets the accumulator for this AggregateFunction. This function must be 
implemented for
 
 Review comment:
   AggregateFunction -> TableAggregateFunction


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241285918
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ##
 @@ -295,6 +295,15 @@ abstract class CodeGenerator(
 generateResultExpression(input1AccessExprs ++ input2AccessExprs, 
returnType, resultFieldNames)
   }
 
+  /**
+* Generates an expression from the input.
+*/
+  def generateFieldAccessExprs: Seq[GeneratedExpression] = {
 
 Review comment:
   Is it possible to avoid this change? Seems that you can use 
CodeGenerator#generateConverterResultExpression


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241286801
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.dataview._
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
+import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.utils.RetractableCollector
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.table.codegen.TableAggregationCodeGenerator._
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * A base code generator for generating [[GeneratedTableAggregations]].
+  *
+  * @param config configuration that determines runtime 
behavior
+  * @param nullableInput  input(s) can be null.
+  * @param input  type information about the input of the 
Function
+  * @param constants  constant expressions that act like a second 
input in the
+  *   parameter indices.
+  * @param name   Class name of the function.
+  *   Does not need to be unique but has to be a 
valid Java class
+  *   identifier.
+  * @param physicalInputTypes Physical input row types
+  * @param tableAggOutputTypesOutput types of TableAggregateFunction
+  * @param outputSchema   The type of the rows emitted by 
TableAggregate operator
+  * @param aggregates All aggregate functions
+  * @param aggFields  Indexes of the input fields for all 
aggregate functions
+  * @param aggMapping The mapping of aggregates to output fields
+  * @param isDistinctAggs The flag array indicating whether it is 
distinct aggregate.
+  * @param isStateBackedDataViews a flag to indicate if distinct filter uses 
state backend.
+  * @param partialResults A flag defining whether final or partial 
results (accumulators)
+  *   are set
+  *   to the output row.
+  * @param fwdMapping The mapping of input fields to output fields
+  * @param mergeMapping   An optional mapping to specify the 
accumulators to merge. If not
+  *   set, we
+  *   assume that both rows have the accumulators 
at the same position.
+  * @param outputArityThe number of fields in the output row.
+  * @param needRetracta flag to indicate if the aggregate needs 
the retract method
+  * @param needEmitWithRetracta flag to indicate if the aggregate needs to 
output retractions
+  *   when update
+  * @param needMerge  a flag to indicate if the aggregate needs 
the merge method
+  * @param needReset  a flag to indicate if the aggregate needs 
the resetAccumulator
+  *   method
+  * @param accConfig  Data view specification for accumulators
+  */
+class TableAggregationCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+constants: Option[Seq[RexLiteral]],
+name: String,
+physicalInputTypes: Seq[TypeInformation[_]],
+tableAggOutputTypes: TypeInformation[_],
+outputSchema: RowSchema,
+aggregates: Array[UserDefinedAggregateFunction[_ <: Any, _ <: Any]],
+aggFields: Array[Array[Int]],
+aggMapping: Array[Int],
+isDistinctAggs: Array[Boolean],
+isStateBackedDataViews: Boolean,
+partialResults: Boolean,
+fwdMapping: Array[Int]

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241280390
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - emitValue or emitValueWithRetract.
+ *
+ * There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator.
+ *
+ * All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} are defined in
 
 Review comment:
   methods -> method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241291444
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTableAggregate.scala
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.functions.utils.TableAggSqlFunction
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.util.Logging
+
+/**
+  *
+  * Flink RelNode for data stream unbounded table aggregate
+  *
+  * @param cluster Cluster of the RelNode, represent for an 
environment of related
+  *relational expressions during the optimization of a 
query.
+  * @param traitSetTrait set of the RelNode
+  * @param inputNode   The input RelNode of aggregation
+  * @param schema  The type of the rows emitted by this RelNode
+  * @param inputSchema The type of the rows consumed by this RelNode
+  * @param namedAggregates List of calls to aggregate functions and their 
output field names
+  * @param groupings   The position (in the input Row) of the grouping keys
+  */
+class DataStreamTableAggregate(
 
 Review comment:
   is it would be better to use DataStreamGroupTableAggregate?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241281679
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - emitValue or emitValueWithRetract.
+ *
+ * There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator.
+ *
+ * All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} are defined in
+ * the {@link TableAggregateFunction} functions, while other methods are 
explained below.
+ *
+ * 
+ * {@code
+ * Processes the input values and update the provided accumulator instance. 
The method
+ * accumulate can be overloaded with different custom types and arguments. A 
TableAggregateFunction
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design 
assumes the
+ * inputs are the values that have been previously accumulated. The method 
retract can be
+ * overloaded with different custom types and arguments. This function must be 
implemented for
+ * data stream bounded OVER aggregates.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void retract(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Merges a group of accumulator instances into one accumulator instance. This 
function must be
+ * implemented for data stream session window grouping aggregates and data set 
grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate 
results. It should
+ *be noted that the accumulator may contain the previous 
aggregated
+ *results. Therefore user should not replace or clean this 
instance in the
+ *custom merge method.
+ * param: its an java.lang.Iterable pointed to a group of accumulators 
that will be
+ *merged.
+ *
+ * public void merge(ACC accumulator, java.lang.Iterable iterable)
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Resets the accumulator for this AggregateFunction. This function must be 
implemented for
+ * data set grouping aggregates.
+ *
+ * param: accumulator the accumulator which needs to be reset
+ *
+ * public void resetAccumulator(ACC accumulator)
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Output data incrementally in upsert or append mode. For example, if we emit 
data for a TopN
+ * TableAggregateFunction, we don't have to output all top N elements each 
time a record comes.
 
 Review comment:
   have to -> need


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241289780
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTableAggregate.scala
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.functions.utils.TableAggSqlFunction
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.util.Logging
+
+/**
+  *
+  * Flink RelNode for data stream unbounded table aggregate
+  *
+  * @param cluster Cluster of the RelNode, represent for an 
environment of related
+  *relational expressions during the optimization of a 
query.
+  * @param traitSetTrait set of the RelNode
+  * @param inputNode   The input RelNode of aggregation
+  * @param schema  The type of the rows emitted by this RelNode
+  * @param inputSchema The type of the rows consumed by this RelNode
+  * @param namedAggregates List of calls to aggregate functions and their 
output field names
+  * @param groupings   The position (in the input Row) of the grouping keys
+  */
+class DataStreamTableAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+schema: RowSchema,
+inputSchema: RowSchema,
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+val groupings: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+with CommonAggregate
+with DataStreamRel
+with Logging {
+
+  override def deriveRowType() = schema.relDataType
+
+  override def needsUpdatesAsRetraction = true
+
+  override def producesUpdates = true
+
+  override def consumesRetractions = true
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+new DataStreamTableAggregate(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  schema,
+  inputSchema,
+  namedAggregates,
+  groupings)
+  }
+
+  override def toString: String = {
+s"TableAggregate(${
+  if (!groupings.isEmpty) {
+s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), "
+  } else {
+""
+  }
+} flatAggregate:(${aggregationToString(
+  inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(
+inputSchema.relDataType, groupings), !groupings.isEmpty)
+  .item("flatAggregate", aggregationToString(
+inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+val aggCall = namedAggregates(0).left
+
+val processFunction = AggregateUtil.createTableAggregateFunction(
+  tableEnv.getConfig,
+  false,
+  inputSchema.typeInfo,
+  None,
+  namedAggregates,
+  inputSchema.relDataType,
+  inputSchema.fieldTypeInfos,
+  

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241293985
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableAggregateITCase.scala
 ##
 @@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, Types}
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.utils.{Top3WithRetractInput, TopN}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * Tests of groupby (without window) aggregations
+  */
+class TableAggregateITCase extends StreamingWithStateTestBase {
 
 Review comment:
   Add a distinct ITCase


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241287190
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
 ##
 @@ -71,6 +71,16 @@ case class GeneratedAggregationsFunction(
 name: String,
 code: String)
 
+/**
+  * Describes a generated table aggregate helper function
+  *
+  * @param name class name of the generated Function.
+  * @param code code of the generated Function.
+  */
+case class GeneratedTableAggregationsFunction(
 
 Review comment:
   Maybe we can use GeneratedAggregationsFunction?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241290354
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.dataview._
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
+import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.utils.RetractableCollector
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.table.codegen.TableAggregationCodeGenerator._
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * A base code generator for generating [[GeneratedTableAggregations]].
+  *
+  * @param config configuration that determines runtime 
behavior
+  * @param nullableInput  input(s) can be null.
+  * @param input  type information about the input of the 
Function
+  * @param constants  constant expressions that act like a second 
input in the
+  *   parameter indices.
+  * @param name   Class name of the function.
+  *   Does not need to be unique but has to be a 
valid Java class
+  *   identifier.
+  * @param physicalInputTypes Physical input row types
+  * @param tableAggOutputTypesOutput types of TableAggregateFunction
+  * @param outputSchema   The type of the rows emitted by 
TableAggregate operator
+  * @param aggregates All aggregate functions
+  * @param aggFields  Indexes of the input fields for all 
aggregate functions
+  * @param aggMapping The mapping of aggregates to output fields
+  * @param isDistinctAggs The flag array indicating whether it is 
distinct aggregate.
+  * @param isStateBackedDataViews a flag to indicate if distinct filter uses 
state backend.
+  * @param partialResults A flag defining whether final or partial 
results (accumulators)
+  *   are set
+  *   to the output row.
+  * @param fwdMapping The mapping of input fields to output fields
+  * @param mergeMapping   An optional mapping to specify the 
accumulators to merge. If not
+  *   set, we
+  *   assume that both rows have the accumulators 
at the same position.
+  * @param outputArityThe number of fields in the output row.
+  * @param needRetracta flag to indicate if the aggregate needs 
the retract method
+  * @param needEmitWithRetracta flag to indicate if the aggregate needs to 
output retractions
+  *   when update
+  * @param needMerge  a flag to indicate if the aggregate needs 
the merge method
+  * @param needReset  a flag to indicate if the aggregate needs 
the resetAccumulator
+  *   method
+  * @param accConfig  Data view specification for accumulators
+  */
+class TableAggregationCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+constants: Option[Seq[RexLiteral]],
+name: String,
+physicalInputTypes: Seq[TypeInformation[_]],
+tableAggOutputTypes: TypeInformation[_],
+outputSchema: RowSchema,
 
 Review comment:
   tableAggOutputTypes can be computed from outputSchema


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above 

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241279917
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -550,6 +550,82 @@ val result = orders.distinct()
 
 {% top %}
 
+### TableAggregations
+
+
+
+
+
+  
+
+  Operators
+  Description
+
+  
+  
+
+  
+GroupBy TableAggregation
+Streaming
+Result Updating
+  
+  
+Similar to a GroupBy Aggregation. Groups the rows on the 
grouping keys with a following running table aggregation operator to aggregate 
rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support * and aggregate functions.
+{% highlight java %}
+TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
+tableEnv.registerFunction("myTableAggFunc", tableAggFunc);
+Table orders = tableEnv.scan("Orders");
+Table result = orders
+.groupBy("a")
+.flatAggregate("myTableAggFunc(a, b, c)")
+.select("_1 as a, _2 as b");
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the type of aggregation and 
the number of distinct grouping keys. Please provide a query configuration with 
valid retention interval to prevent excessive state size. See Query Configuration for 
details.
+  
+
+
+  
+
+
+
+
+
+
+  
+
+  Operators
+  Description
+
+  
+  
+
+
+  
+GroupBy TableAggregation
+Streaming
+Result Updating
+  
+  
+Similar to a GroupBy Aggregation. Groups the rows on the 
grouping keys with a following running table aggregation operator to aggregate 
rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support * and aggregate functions.
 
 Review comment:
   IMO, the select statement can support *


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241284274
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -358,8 +358,12 @@ abstract class StreamTableEnvironment(
 tableKeys match {
   case Some(keys) => upsertSink.setKeyFields(keys)
   case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
-  case None if !isAppendOnlyTable => throw new TableException(
-"UpsertStreamTableSink requires that Table has full primary keys 
if it is updated.")
+  case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == 
null =>
+throw new TableException(
+  "UpsertStreamTableSink requires that Table has full primary keys 
if it is updated. " +
+"You can enforce your sink keys by override enforceKeyFields 
method in " +
+"UpsertStreamTableSink interface if you really want to, but be 
carefull.")
 
 Review comment:
   carefull -> careful


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241286060
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.dataview._
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.functions.UserDefinedAggregateFunction
+import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.utils.RetractableCollector
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.table.codegen.TableAggregationCodeGenerator._
+import org.apache.flink.table.plan.schema.RowSchema
+
+/**
+  * A base code generator for generating [[GeneratedTableAggregations]].
+  *
+  * @param config configuration that determines runtime 
behavior
+  * @param nullableInput  input(s) can be null.
+  * @param input  type information about the input of the 
Function
+  * @param constants  constant expressions that act like a second 
input in the
+  *   parameter indices.
+  * @param name   Class name of the function.
+  *   Does not need to be unique but has to be a 
valid Java class
+  *   identifier.
+  * @param physicalInputTypes Physical input row types
+  * @param tableAggOutputTypesOutput types of TableAggregateFunction
+  * @param outputSchema   The type of the rows emitted by 
TableAggregate operator
+  * @param aggregates All aggregate functions
+  * @param aggFields  Indexes of the input fields for all 
aggregate functions
+  * @param aggMapping The mapping of aggregates to output fields
+  * @param isDistinctAggs The flag array indicating whether it is 
distinct aggregate.
+  * @param isStateBackedDataViews a flag to indicate if distinct filter uses 
state backend.
+  * @param partialResults A flag defining whether final or partial 
results (accumulators)
+  *   are set
+  *   to the output row.
+  * @param fwdMapping The mapping of input fields to output fields
+  * @param mergeMapping   An optional mapping to specify the 
accumulators to merge. If not
+  *   set, we
+  *   assume that both rows have the accumulators 
at the same position.
+  * @param outputArityThe number of fields in the output row.
+  * @param needRetracta flag to indicate if the aggregate needs 
the retract method
+  * @param needEmitWithRetracta flag to indicate if the aggregate needs to 
output retractions
+  *   when update
+  * @param needMerge  a flag to indicate if the aggregate needs 
the merge method
+  * @param needReset  a flag to indicate if the aggregate needs 
the resetAccumulator
+  *   method
+  * @param accConfig  Data view specification for accumulators
+  */
+class TableAggregationCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+constants: Option[Seq[RexLiteral]],
+name: String,
+physicalInputTypes: Seq[TypeInformation[_]],
+tableAggOutputTypes: TypeInformation[_],
+outputSchema: RowSchema,
+aggregates: Array[UserDefinedAggregateFunction[_ <: Any, _ <: Any]],
+aggFields: Array[Array[Int]],
+aggMapping: Array[Int],
+isDistinctAggs: Array[Boolean],
+isStateBackedDataViews: Boolean,
+partialResults: Boolean,
+fwdMapping: Array[Int]

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241281005
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - emitValue or emitValueWithRetract.
+ *
+ * There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator.
+ *
+ * All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} are defined in
+ * the {@link TableAggregateFunction} functions, while other methods are 
explained below.
+ *
+ * 
+ * {@code
+ * Processes the input values and update the provided accumulator instance. 
The method
+ * accumulate can be overloaded with different custom types and arguments. A 
TableAggregateFunction
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator   the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * 
+ *
+ * 
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design 
assumes the
+ * inputs are the values that have been previously accumulated. The method 
retract can be
+ * overloaded with different custom types and arguments. This function must be 
implemented for
+ * data stream bounded OVER aggregates.
 
 Review comment:
   TableAggregateFunction is not supported in OVER.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241280457
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - emitValue or emitValueWithRetract.
+ *
+ * There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator.
+ *
+ * All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} are defined in
+ * the {@link TableAggregateFunction} functions, while other methods are 
explained below.
+ *
 
 Review comment:
   TableAggregateFunction -> UserDefinedAggregateFunction. The comments in 
AggregateFunction should also be updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241289558
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTableAggregate.scala
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.functions.utils.TableAggSqlFunction
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.util.Logging
+
+/**
+  *
+  * Flink RelNode for data stream unbounded table aggregate
+  *
+  * @param cluster Cluster of the RelNode, represent for an 
environment of related
+  *relational expressions during the optimization of a 
query.
+  * @param traitSetTrait set of the RelNode
+  * @param inputNode   The input RelNode of aggregation
+  * @param schema  The type of the rows emitted by this RelNode
+  * @param inputSchema The type of the rows consumed by this RelNode
+  * @param namedAggregates List of calls to aggregate functions and their 
output field names
+  * @param groupings   The position (in the input Row) of the grouping keys
+  */
+class DataStreamTableAggregate(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+schema: RowSchema,
+inputSchema: RowSchema,
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+val groupings: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+with CommonAggregate
+with DataStreamRel
+with Logging {
+
+  override def deriveRowType() = schema.relDataType
+
+  override def needsUpdatesAsRetraction = true
+
+  override def producesUpdates = true
+
+  override def consumesRetractions = true
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+new DataStreamTableAggregate(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  schema,
+  inputSchema,
+  namedAggregates,
+  groupings)
+  }
+
+  override def toString: String = {
+s"TableAggregate(${
+  if (!groupings.isEmpty) {
+s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), "
+  } else {
+""
+  }
+} flatAggregate:(${aggregationToString(
+  inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(
+inputSchema.relDataType, groupings), !groupings.isEmpty)
+  .item("flatAggregate", aggregationToString(
+inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
 
 Review comment:
   can add some state retention warning like done in DataStreamGroupAggregate


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Servi

[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241283978
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/RetractableCollector.java
 ##
 @@ -16,21 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sinks
+package org.apache.flink.table.utils;
 
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.Table
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Collector;
 
 /**
-  * Defines an external [[TableSink]] to emit streaming [[Table]] with only 
insert changes.
-  *
-  * If the [[Table]] is also modified by update or delete changes, a
-  * [[org.apache.flink.table.api.TableException]] will be thrown.
-  *
-  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and 
supports.
-  */
-trait AppendStreamTableSink[T] extends StreamTableSink[T] {
+ * Collects a record and forwards it. The collector can output retract 
messages with retract method.
+ * Currently, RetractableCollector is used in TableAggregateFunction.
 
 Review comment:
   RetractableCollector can only be used in TableAggregateFunction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API

2018-12-12 Thread GitBox
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add 
UnBounded FlatAggregate operator to streaming Table API
URL: https://github.com/apache/flink/pull/7209#discussion_r241280112
 
 

 ##
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base class for user-defined table aggregates.
+ *
+ * The behavior of an {@link TableAggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link TableAggregateFunction} needs at least three methods:
 
 Review comment:
   An -> A


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on issue #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2018-12-12 Thread GitBox
zhijiangW commented on issue #7199: [FLINK-10662][network] Refactor the 
ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#issuecomment-446865059
 
 
   Thanks for your reviews! @pnowojski 
   
   I would consider your above suggestions and double check the travis failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2018-12-12 Thread GitBox
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] 
Refactor the ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#discussion_r241289363
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -68,20 +68,29 @@
 
private final boolean flushAlways;
 
+   private final boolean isBroadcastSelector;
+
private Counter numBytesOut = new SimpleCounter();
 
private Counter numBuffersOut = new SimpleCounter();
 
public RecordWriter(ResultPartitionWriter writer) {
-   this(writer, new RoundRobinChannelSelector());
+   this(writer, new RoundRobinChannelSelector(), false);
}
 
-   @SuppressWarnings("unchecked")
-   public RecordWriter(ResultPartitionWriter writer, ChannelSelector 
channelSelector) {
-   this(writer, channelSelector, false);
+   public RecordWriter(
 
 Review comment:
   To be honest, I am also a bit embarrassed for current implementation. It 
does exist the redundant and potential inconsistent issues as you mentioned.
   
   The first way you mentioned can solve above issues if we can check whether 
the selector is broadcast or not internally in `RecordWriter`.
   
   For the second way we can not only pass the boolean argument without 
selector, because once the boolean value is false, we should rely on the 
specific selector to get channels. So the selector has to be given explicitly, 
but the boolean can be got implicitly from the selector.
   
   I also thought of another way before, that is defining another 
BroadcastRecordWriter for handling directly. Then the writer is distinguished 
by common `RecordWriter` and special `BroadcastRecordWriter`. In common 
`RecordWriter`, the selector is needed for routing channels, and for 
`BroadcastRecordWriter` it does not need the selector at all. To do so we may 
need further extract the common codes for these two writers, furthermore we may 
need to remove current `StreamRecordWriter` which would be integrated with 
current `RecordWriter` directly. So the writer is divided by broadcast mode, 
not by stream mode. What do you think this way?
   
   I would also further consider the changes for above two options through.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11128) Methods of org.apache.flink.table.expressions.Expression are private[flink]

2018-12-12 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719822#comment-16719822
 ] 

vinoyang edited comment on FLINK-11128 at 12/13/18 6:45 AM:


[~dfakhritdinov] I tried it in a new project. However, it did not cause any 
problem. What is the phenomenon of "impossible" that you mean?


was (Author: yanghua):
[~dfakhritdinov] I tried it. However, it did not cause any problem. What is the 
phenomenon of "impossible" that you mean?

> Methods of org.apache.flink.table.expressions.Expression are private[flink]
> ---
>
> Key: FLINK-11128
> URL: https://issues.apache.org/jira/browse/FLINK-11128
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Denys Fakhritdinov
>Assignee: vinoyang
>Priority: Critical
>
> Source with push-down are part of public API: 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#defining-a-tablesource-with-filter-push-down]
>  
> {{But all methods of org.apache.flink.table.expressions.Expression are 
> *{color:#FF}private[flink]{color}*, what makes it impossible to use.}}
>  
> {{Please, change private[flink] to public.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11128) Methods of org.apache.flink.table.expressions.Expression are private[flink]

2018-12-12 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719822#comment-16719822
 ] 

vinoyang commented on FLINK-11128:
--

[~dfakhritdinov] I tried it. However, it did not cause any problem. What is the 
phenomenon of "impossible" that you mean?

> Methods of org.apache.flink.table.expressions.Expression are private[flink]
> ---
>
> Key: FLINK-11128
> URL: https://issues.apache.org/jira/browse/FLINK-11128
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Denys Fakhritdinov
>Assignee: vinoyang
>Priority: Critical
>
> Source with push-down are part of public API: 
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#defining-a-tablesource-with-filter-push-down]
>  
> {{But all methods of org.apache.flink.table.expressions.Expression are 
> *{color:#FF}private[flink]{color}*, what makes it impossible to use.}}
>  
> {{Please, change private[flink] to public.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11062) web ui log error

2018-12-12 Thread lining (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining closed FLINK-11062.
--
Resolution: Fixed

> web ui log error
> 
>
> Key: FLINK-11062
> URL: https://issues.apache.org/jira/browse/FLINK-11062
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.7.0
>Reporter: lining
>Priority: Major
> Attachments: image-2018-12-04-16-50-45-092.png, 
> image-2018-12-04-16-53-23-887.png
>
>
> !image-2018-12-04-16-53-23-887.png!
>  
>  
> see log will error which  reason is net::ERR_CONTENT_LENGTH_MISMATCH



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on issue #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-12 Thread GitBox
dianfu commented on issue #7253: [FLINK-11074] [table][tests] Enable harness 
tests with RocksdbStateBackend and add harness tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#issuecomment-446850114
 
 
   @twalthr @sunjincheng121 Could you help to take a look at this PR? You can 
see from the harness test of #7286 that this change makes it very easy to write 
a new harness test. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-9462) Disentangle flink-json and flink-table

2018-12-12 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-9462:
--

Assignee: boshu Zheng

> Disentangle flink-json and flink-table
> --
>
> Key: FLINK-9462
> URL: https://issues.apache.org/jira/browse/FLINK-9462
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: boshu Zheng
>Priority: Major
> Fix For: 1.8.0
>
>
> The {{flink-json}} module defines Json serialization and deserialization 
> schemas. Additionally, it defines Json table descriptor. Due to this, it has 
> a dependency on {{flink-table}}. We should either rename this module into 
> {{flink-json-table}} or move the table API specific classes into a different 
> module. That way we could remove the dependency on {{flink-table}} which 
> decouples the Json serialization and deserialization schemas from the Table 
> API on which the schemas should not depend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table

2018-12-12 Thread GitBox
hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-446846572
 
 
   @zjffdu Thanks for the update. I will take a look within next week.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11035) Notify data available to network stack immediately after finishing BufferBuilder

2018-12-12 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719746#comment-16719746
 ] 

zhijiang commented on FLINK-11035:
--

[~pnowojski], thanks for your explanation! I am just not sure whether you have 
other thoughts for this issue before, and I am clear now.

If there triggers a flush between two {{BufferBuilder}}'s add operation, then 
we can get benefits from current design. In other words the flush thread help 
us do the notification work to some extent. But the flush timeout is 
configured, not very determined, especially for batch job no flush mechanism. 
So it has two sides, and the disadvantage aspect may be just a corner case.

I have not found the problems in real world yet, only when we mock to build 
such special scenarios. So this improvement is not critical and determined now, 
we can keep the current logic until we have new finds. :)

> Notify data available to network stack immediately after finishing 
> BufferBuilder
> 
>
> Key: FLINK-11035
> URL: https://issues.apache.org/jira/browse/FLINK-11035
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The data availability notification for network relies on whether there are 
> finished _BufferBuilder_ or flush triggered. If flush is not triggered and 
> the first _BufferBuilder_ enqueues into the subpartition, although this 
> _BufferBuilder_ is finished on _RecordWriter_ side, it has to rely on 
> enqueuing the second _BufferBuilder_ to trigger notification available.  It 
> may bring some delays for transporting the finished _BufferBuilder_ in 
> network, especially there has a blocking operation for requesting the second 
> _BufferBuilder_ from pool.
> Supposing there is only one available buffer in LocalBufferPool in extreme 
> scenarios, if the first _BufferBuilder_ is not transported and recycled, the 
> requesting for second _BufferBuilder_ will be blocked all the time.
> I propose to add a _notifyBufferFinished_ method in _ResultPartitionWriter_ 
> interface, then _RecordWriter_ can notify via it after _BufferBuilder_ 
> finished_._
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6329: [FLINK-9841] Web UI only show partial taskmanager log

2018-12-12 Thread GitBox
yanghua commented on issue #6329: [FLINK-9841] Web UI only show partial 
taskmanager log
URL: https://github.com/apache/flink/pull/6329#issuecomment-446841558
 
 
   @jinglining Not currently, I encountered this problem in our production 
environment, and I fixed it and verified it. It has been merged into Flink 
1.6.0, and our current production environment version is 1.6.0. It behaves 
normally in our environment. What problem have you encountered?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

2018-12-12 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-11136.
-
   Resolution: Fixed
Fix Version/s: 1.7.1
   1.8.0
   1.6.3

Fixed for 1.8.0 with ed0aefa6775f655591b4c0fe46382446921b7155
Fixed for 1.7.1 with ff1821a6d2f8317d0c344719b14350ac362143d9
Fixed for 1.6.3 with ff9b7f1b60a4aeb1d925b236b9818002aad830de


> Fix the logical of merge for DISTINCT aggregates
> 
>
> Key: FLINK-11136
> URL: https://issues.apache.org/jira/browse/FLINK-11136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The logic of merge for DISTINCT aggregates has bug. For the following query:
> {code:java}
> SELECT
>   c,
>   COUNT(DISTINCT b),
>   SUM(DISTINCT b),
>   SESSION_END(rowtime, INTERVAL '0.005' SECOND)
> FROM MyTable
> GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code}
> the following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be 
> cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58)
> at 
> org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50)
> at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33)
> at 
> org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ifndef-SleePy commented on issue #7290: [FLINK-11137] [runtime] Fix unexpected RegistrationTimeoutException of TaskExecutor

2018-12-12 Thread GitBox
ifndef-SleePy commented on issue #7290: [FLINK-11137] [runtime] Fix unexpected 
RegistrationTimeoutException of TaskExecutor
URL: https://github.com/apache/flink/pull/7290#issuecomment-446838962
 
 
   Travis testing build failed in connectors model. I checked the failed case, 
it works well in my local environment. So I think that's probably an unstable 
case.
   I have updated the PR to trigger the travis integration test building again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11001) Window rowtime attribute can't be renamed in Java

2018-12-12 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-11001.
-
   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed for 1.8.0 with ff9b7f1b60a4aeb1d925b236b9818002aad830de

> Window rowtime attribute can't be renamed in Java
> -
>
> Key: FLINK-11001
> URL: https://issues.apache.org/jira/browse/FLINK-11001
> Project: Flink
>  Issue Type: Bug
> Environment: 
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently, we can rename window rowtime attribute like this in Scala:
> {code:java}
> table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
> {code}
> However, an exception will be thrown if we use java(by changing the 
> Expressions to String):
> {code:java}
> table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select("w.rowtime as rowtime, int.count as int")
> {code}
> The Exception is:
> {code:java}
> org.apache.flink.table.api.ExpressionParserException: Could not parse 
> expression at column 11: `,' expected but `a' found
> w.rowtime as rowtime, int.count as int
> {code}
>  
> To solve the problem, we can add rename support in {{ExpressionParser}}. 
> However, this may conflict with the design of source which use as before 
> rowtime:
> {code:java}
> stream.toTable(
>   tEnv,
>   ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as 
> a"): _*)
> {code}
> Personally, I think we should keep the two consistent, so the final api would 
> be:
> {code:java}
> // window case
> .select("w.rowtime as rowtime, int.count as int")
> // source case
> stream.toTable(
>   tEnv,
>   ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): 
> _*)
> {code}
> Any suggestions would be greatly appreciated!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs

2018-12-12 Thread GitBox
fhueske commented on issue #7248: [hotfix][tableApi] Fix the description 
information of intersect in tableApi docs
URL: https://github.com/apache/flink/pull/7248#issuecomment-446837983
 
 
   Consistent documentation is a good point. However, I'd rather change the 
other examples than indicating a limitation that does not exist.
   
   Regarding the tests, IMO the should not be considered as documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11146) Get rid of legacy codes from ClusterClient

2018-12-12 Thread TisonKun (JIRA)
TisonKun created FLINK-11146:


 Summary: Get rid of legacy codes from ClusterClient
 Key: FLINK-11146
 URL: https://issues.apache.org/jira/browse/FLINK-11146
 Project: Flink
  Issue Type: Sub-task
  Components: Client, Tests
Affects Versions: 1.8.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.8.0


As [~StephanEwen] mentioned in ML, the client needs big refactoring / cleanup. 
It should use a proper HTTP client library to help with future authentication 
mechanisms.

After an investigation I notice that the valid cluster clients are only 
{{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, 
{{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 
codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. With 
this removal we arrive a clean stage where we can think how to implement a 
proper HTTP client more comfortably.

1. {{StandaloneClusterClient}} is now depended on by 
{{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) 
and {{FlinkClient}}(part of flink-storm which is decided to be removed 
FLINK-10571). Also relevant tests need to be ported(or directly removed).

2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 Remove 
legacy flink-yarn component.

3. Testing classes inheriting from {{ClusterClient}} need to be ported(or 
directly removed).

4. Get rid of legacy codes inside {{ClusterClient}} it self, such as 
{{#run(JobGraph, ClassLoader)}}

Besides, what is {{JobClient}} used for? I cannot find valid usages of it.
cc [~mxm] [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11139) stream non window join support state ttl

2018-12-12 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719735#comment-16719735
 ] 

Fabian Hueske commented on FLINK-11139:
---

[~zhaoshijie] I gave you contributor permissions.

> stream non window join support state ttl
> 
>
> Key: FLINK-11139
> URL: https://issues.apache.org/jira/browse/FLINK-11139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: zhaoshijie
>Priority: Major
>
> stream non window join function use timer to delete expired data,it is ok for 
> small amount of data or short expiration time,but it will be OOM(too many 
> timer)on taskManger  when there  is a long expiration time and  a large 
> amount of data。In fact, table module other state function has same problem,I 
> would like to contribute to fix it。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11139) stream non window join support state ttl

2018-12-12 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719734#comment-16719734
 ] 

Fabian Hueske commented on FLINK-11139:
---

[~zhaoshijie], I assume you configured a [idle state retention 
time|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time].
 How did you choose the min and max values? The difference between min and max 
determines how often a timer is registered.

Moreover, [~hequn8128] reworked the clean up timers such that they use the 
timer delete feature that was added in Flink 1.6.0. 
The PR was merged a few days ago and should already fix the problem.

> stream non window join support state ttl
> 
>
> Key: FLINK-11139
> URL: https://issues.apache.org/jira/browse/FLINK-11139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: zhaoshijie
>Priority: Major
>
> stream non window join function use timer to delete expired data,it is ok for 
> small amount of data or short expiration time,but it will be OOM(too many 
> timer)on taskManger  when there  is a long expiration time and  a large 
> amount of data。In fact, table module other state function has same problem,I 
> would like to contribute to fix it。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates

2018-12-12 Thread GitBox
dianfu commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of 
DISTINCT aggregates
URL: https://github.com/apache/flink/pull/7284#issuecomment-446836298
 
 
   Thanks a lot  for the review and merge @fhueske !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates

2018-12-12 Thread GitBox
asfgit closed pull request #7284: [FLINK-11136] [table] Fix the merge logic of 
DISTINCT aggregates
URL: https://github.com/apache/flink/pull/7284
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 566e3d7cbc5..57cc815fee0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -142,6 +142,21 @@ class AggregationCodeGenerator(
   fields.mkString(", ")
 }
 
+val parametersCodeForDistinctMerge = aggFields.map { inFields =>
+  val fields = inFields.filter(_ > -1).zipWithIndex.map { case (f, i) =>
+// index to constant
+if (f >= physicalInputTypes.length) {
+  constantFields(f - physicalInputTypes.length)
+}
+// index to input field
+else {
+  s"(${CodeGenUtils.boxedTypeTermForTypeInfo(physicalInputTypes(f))}) 
k.getField($i)"
+}
+  }
+
+  fields.mkString(", ")
+}
+
 // get method signatures
 val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes)
 val constantClasses = 
UserDefinedFunctionUtils.typeInfoToClass(constantTypes)
@@ -643,7 +658,7 @@ class AggregationCodeGenerator(
|  (${classOf[Row].getCanonicalName}) entry.getKey();
|  Long v = (Long) entry.getValue();
|  if (aDistinctAcc$i.add(k, v)) {
-   |${aggs(i)}.accumulate(aAcc$i, k);
+   |${aggs(i)}.accumulate(aAcc$i, 
${parametersCodeForDistinctMerge(i)});
|  }
|}
|a.setField($i, aDistinctAcc$i);
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 46dde8e0225..ddc2a687541 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -78,6 +78,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 val sqlQuery = "SELECT c, " +
   "  COUNT(DISTINCT b)," +
+  "  SUM(DISTINCT b)," +
   "  SESSION_END(rowtime, INTERVAL '0.005' SECOND) " +
   "FROM MyTable " +
   "GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c "
@@ -87,9 +88,9 @@ class SqlITCase extends StreamingWithStateTestBase {
 env.execute()
 
 val expected = Seq(
-  "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till 
{14L}
-  "Hello,1,1970-01-01 00:00:00.021",   // window starts at [16L] till 
{21L}, not merged
-  "Hello,3,1970-01-01 00:00:00.015"// window starts at [1L,2L],
+  "Hello World,1,9,1970-01-01 00:00:00.014", // window starts at [9L] till 
{14L}
+  "Hello,1,16,1970-01-01 00:00:00.021",   // window starts at [16L] 
till {21L}, not merged
+  "Hello,3,6,1970-01-01 00:00:00.015"// window starts at [1L,2L],
//   merged with [8L,10L], by 
[4L], till {15L}
 )
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7297: [hotfix] [docs] Fix typos in Table and SQL docs

2018-12-12 Thread GitBox
asfgit closed pull request #7297: [hotfix] [docs] Fix typos in Table and SQL 
docs
URL: https://github.com/apache/flink/pull/7297
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 55c144a8ed2..1e7ef83fcb7 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -22,7 +22,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-SQL queries are specified with the `sqlQuery()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a 
`Table`. A `Table` can be used in [subsequent SQL and Table API 
queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or 
DataStream](common.html#integration-with-datastream-and-dataset-api), or 
[written to a TableSink](common.html#emit-a-table)). SQL and Table API queries 
can seamlessly mixed and are holistically optimized and translated into a 
single program.
+SQL queries are specified with the `sqlQuery()` method of the 
`TableEnvironment`. The method returns the result of the SQL query as a 
`Table`. A `Table` can be used in [subsequent SQL and Table API 
queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or 
DataStream](common.html#integration-with-datastream-and-dataset-api), or 
[written to a TableSink](common.html#emit-a-table)). SQL and Table API queries 
can be seamlessly mixed and are holistically optimized and translated into a 
single program.
 
 In order to access a table in a SQL query, it must be [registered in the 
TableEnvironment](common.html#register-tables-in-the-catalog). A table can be 
registered from a [TableSource](common.html#register-a-tablesource), 
[Table](common.html#register-a-table), [DataStream, or 
DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, 
users can also [register external catalogs in a 
TableEnvironment](common.html#register-an-external-catalog) to specify the 
location of the data sources.
 
diff --git a/docs/dev/table/streaming/dynamic_tables.md 
b/docs/dev/table/streaming/dynamic_tables.md
index 698cd673185..3420af73840 100644
--- a/docs/dev/table/streaming/dynamic_tables.md
+++ b/docs/dev/table/streaming/dynamic_tables.md
@@ -53,12 +53,12 @@ The following table compares traditional relational algebra 
and stream processin

 
 
-Despite these differences, processing streams with relational queries and SQL 
is not impossible. Advanced relational database systems offer a feature called 
*Materialized Views*. A materialized view is defined as a SQL query, just like 
a regular virtual view. In contrast to a virtual view, a materialized view 
caches the result of the query such that the query does not need to be 
evaluated when the view is accessed. A common challenge for caching is to 
prevent a cache from serving outdated results. A materialized view becomes 
outdated when the base tables of its definition query are modified. *Eager View 
Maintenance* is a technique to update materialized views and updates a 
materialized view as soon as its base tables are updated.
+Despite these differences, processing streams with relational queries and SQL 
is not impossible. Advanced relational database systems offer a feature called 
*Materialized Views*. A materialized view is defined as a SQL query, just like 
a regular virtual view. In contrast to a virtual view, a materialized view 
caches the result of the query such that the query does not need to be 
evaluated when the view is accessed. A common challenge for caching is to 
prevent a cache from serving outdated results. A materialized view becomes 
outdated when the base tables of its definition query are modified. *Eager View 
Maintenance* is a technique to update a materialized view as soon as its base 
tables are updated.
 
 The connection between eager view maintenance and SQL queries on streams 
becomes obvious if we consider the following:
 
 - A database table is the result of a *stream* of `INSERT`, `UPDATE`, and 
`DELETE` DML statements, often called *changelog stream*.
-- A materialized view is defined as a SQL query. In order to update the view, 
the query is continuously processes the changelog streams of the view's base 
relations.
+- A materialized view is defined as a SQL query. In order to update the view, 
the query continuously processes the changelog streams of the view's base 
relations.
 - The materialized view is the result of the streaming SQL query.
 
 With these points in mind, we introduce following concept of *Dynamic tables* 
in the next section.
@@ -177,7 +177,7 @@ When converting a dynamic table into a stream or writing it 
to an external syste
 
 
 
-* **Upsert stream:** An upsert stream is a stream with two types

[GitHub] jinglining commented on issue #6329: [FLINK-9841] Web UI only show partial taskmanager log

2018-12-12 Thread GitBox
jinglining commented on issue #6329: [FLINK-9841] Web UI only show partial 
taskmanager log
URL: https://github.com/apache/flink/pull/6329#issuecomment-446836098
 
 
   > @zentol yes, you are right, sorry about my expression. here we should not 
use try-with-resource, because the listener will close the file. And it seems 
try-with-resource closes faster than the complete listener.
   
   hi @yanghua  do you have test case for this solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7282: [hotfix][docs] imporve docs of batch overview

2018-12-12 Thread GitBox
asfgit closed pull request #7282: [hotfix][docs] imporve docs of batch overview
URL: https://github.com/apache/flink/pull/7282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index d0043647227..b4dd1671d85 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -406,7 +406,8 @@ DataSet result = in.partitionByRange(0)
   Note: This method works only on single field keys.
 {% highlight java %}
 DataSet> in = // [...]
-DataSet result = in.partitionCustom(Partitioner partitioner, key)
+DataSet result = in.partitionCustom(partitioner, key)
+.mapPartition(new PartitionMapper());
 {% endhighlight %}
   
 
@@ -709,7 +710,7 @@ val result = in.partitionByRange(0).mapPartition { ... }
 {% highlight scala %}
 val in: DataSet[(Int, String)] = // [...]
 val result = in
-  .partitionCustom(partitioner: Partitioner[K], key)
+  .partitionCustom(partitioner, key).mapPartition { ... }
 {% endhighlight %}
   
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11146) Get rid of legacy codes from ClusterClient

2018-12-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-11146:
-
Description: 
As [~StephanEwen] mentioned in ML, the client needs big refactoring / cleanup. 
It should use a proper HTTP client library to help with future authentication 
mechanisms.

After an investigation I notice that the valid cluster clients are only 
{{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, 
{{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 
codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. With 
this removal we arrive a clean stage where we can think how to implement a 
proper HTTP client more comfortably.

1. {{StandaloneClusterClient}} is now depended on by 
{{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) 
and {{FlinkClient}}(part of flink-storm which is decided to be removed 
FLINK-10571). Also relevant tests need to be ported(or directly removed).

2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 Remove 
legacy flink-yarn component.

3. Testing classes inheriting from {{ClusterClient}} need to be ported(or 
directly removed).

4. Get rid of legacy codes inside {{ClusterClient}} it self, such as 
{{#run(JobGraph, ClassLoader)}}

Besides, what is {{JobClient}} used for? I cannot find valid usages of it. 
(Till mentioned it at ML 
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E)
cc [~mxm] [~till.rohrmann]

  was:
As [~StephanEwen] mentioned in ML, the client needs big refactoring / cleanup. 
It should use a proper HTTP client library to help with future authentication 
mechanisms.

After an investigation I notice that the valid cluster clients are only 
{{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, 
{{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 
codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. With 
this removal we arrive a clean stage where we can think how to implement a 
proper HTTP client more comfortably.

1. {{StandaloneClusterClient}} is now depended on by 
{{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) 
and {{FlinkClient}}(part of flink-storm which is decided to be removed 
FLINK-10571). Also relevant tests need to be ported(or directly removed).

2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 Remove 
legacy flink-yarn component.

3. Testing classes inheriting from {{ClusterClient}} need to be ported(or 
directly removed).

4. Get rid of legacy codes inside {{ClusterClient}} it self, such as 
{{#run(JobGraph, ClassLoader)}}

Besides, what is {{JobClient}} used for? I cannot find valid usages of it.
cc [~mxm] [~till.rohrmann]


> Get rid of legacy codes from ClusterClient
> --
>
> Key: FLINK-11146
> URL: https://issues.apache.org/jira/browse/FLINK-11146
> Project: Flink
>  Issue Type: Task
>  Components: Client, Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.8.0
>
>
> As [~StephanEwen] mentioned in ML, the client needs big refactoring / 
> cleanup. It should use a proper HTTP client library to help with future 
> authentication mechanisms.
> After an investigation I notice that the valid cluster clients are only 
> {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, 
> {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 
> codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. 
> With this removal we arrive a clean stage where we can think how to implement 
> a proper HTTP client more comfortably.
> 1. {{StandaloneClusterClient}} is now depended on by 
> {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) 
> and {{FlinkClient}}(part of flink-storm which is decided to be removed 
> FLINK-10571). Also relevant tests need to be ported(or directly removed).
> 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 
> Remove legacy flink-yarn component.
> 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or 
> directly removed).
> 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as 
> {{#run(JobGraph, ClassLoader)}}
> Besides, what is {{JobClient}} used for? I cannot find valid usages of it. 
> (Till mentioned it at ML 
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E)
> cc [~mxm] [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11146) Get rid of legacy codes from ClusterClient

2018-12-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-11146:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-10392)

> Get rid of legacy codes from ClusterClient
> --
>
> Key: FLINK-11146
> URL: https://issues.apache.org/jira/browse/FLINK-11146
> Project: Flink
>  Issue Type: Task
>  Components: Client, Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.8.0
>
>
> As [~StephanEwen] mentioned in ML, the client needs big refactoring / 
> cleanup. It should use a proper HTTP client library to help with future 
> authentication mechanisms.
> After an investigation I notice that the valid cluster clients are only 
> {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, 
> {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 
> codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. 
> With this removal we arrive a clean stage where we can think how to implement 
> a proper HTTP client more comfortably.
> 1. {{StandaloneClusterClient}} is now depended on by 
> {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) 
> and {{FlinkClient}}(part of flink-storm which is decided to be removed 
> FLINK-10571). Also relevant tests need to be ported(or directly removed).
> 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 
> Remove legacy flink-yarn component.
> 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or 
> directly removed).
> 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as 
> {{#run(JobGraph, ClassLoader)}}
> Besides, what is {{JobClient}} used for? I cannot find valid usages of it.
> cc [~mxm] [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11139) stream non window join support state ttl

2018-12-12 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719728#comment-16719728
 ] 

Hequn Cheng commented on FLINK-11139:
-

[~zhaoshijie] Hi, thanks for reporting and trying to fix the problem. Do you 
have any concrete plans of how to fix it? It is suggested to share your ideas 
before giving a pr. :)

> stream non window join support state ttl
> 
>
> Key: FLINK-11139
> URL: https://issues.apache.org/jira/browse/FLINK-11139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: zhaoshijie
>Priority: Major
>
> stream non window join function use timer to delete expired data,it is ok for 
> small amount of data or short expiration time,but it will be OOM(too many 
> timer)on taskManger  when there  is a long expiration time and  a large 
> amount of data。In fact, table module other state function has same problem,I 
> would like to contribute to fix it。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java

2018-12-12 Thread GitBox
hequn8128 commented on issue #7289: [FLINK-11001][table] Fix window rowtime 
attribute can't be renamed bug in Java
URL: https://github.com/apache/flink/pull/7289#issuecomment-446833240
 
 
   @fhueske Thanks a lot for your review!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11138) RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the disk's overhead

2018-12-12 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719713#comment-16719713
 ] 

Yun Tang commented on FLINK-11138:
--

Yes, you are right. But we already disable the WAL of rocksDB. You could view 
configuration  
[here|https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L327].

> RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the 
> disk's overhead
> --
>
> Key: FLINK-11138
> URL: https://issues.apache.org/jira/browse/FLINK-11138
> Project: Flink
>  Issue Type: Improvement
>Reporter: cailiuyang
>Priority: Major
>
> The wal of rocksdb is used to recovery after crash, but after flink-job 
> crash, it always recovery from last checkpoint, so it seems wal is useless.
> Am i right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode

2018-12-12 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-11088:
--
Description: 
Currently flink-yarn assumes keytab is shipped as application master 
environment local resource on client side and will be distributed to all the 
TMs. This does not work for YARN proxy user mode [1] since proxy user or super 
user might not have access to actual users' keytab, but can request delegation 
tokens on users' behalf. 

Based on the type of security options for long-living YARN service[2], we 
propose to have the keytab file path discovery configurable depending on the 
launch mode of the YARN client. 

Reference: 
[1] 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
[2] 
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services


  was:
Currently flink-yarn assumes keytab is shipped as application master 
environment local resource on client side and will be distributed to all the 
TMs. This does not work for YARN proxy user mode since proxy user or super user 
does not have access to actual user's keytab but only delegation tokens. 

We propose to have the keytab file path discovery configurable depending on the 
launch mode of the YARN client. 

Reference: 
[1] 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
[2] 
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services



> Improve Kerberos Authentication using Keytab in YARN proxy user mode
> 
>
> Key: FLINK-11088
> URL: https://issues.apache.org/jira/browse/FLINK-11088
> Project: Flink
>  Issue Type: Improvement
>  Components: Security, YARN
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently flink-yarn assumes keytab is shipped as application master 
> environment local resource on client side and will be distributed to all the 
> TMs. This does not work for YARN proxy user mode [1] since proxy user or 
> super user might not have access to actual users' keytab, but can request 
> delegation tokens on users' behalf. 
> Based on the type of security options for long-living YARN service[2], we 
> propose to have the keytab file path discovery configurable depending on the 
> launch mode of the YARN client. 
> Reference: 
> [1] 
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
> [2] 
> https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9529) Do not expose keyed operations on ProcessFunction.Context

2018-12-12 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-9529:
--

Assignee: boshu Zheng

> Do not expose keyed operations on ProcessFunction.Context
> -
>
> Key: FLINK-9529
> URL: https://issues.apache.org/jira/browse/FLINK-9529
> Project: Flink
>  Issue Type: Sub-task
>  Components: Java API
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: boshu Zheng
>Priority: Major
>
> Currently, the {{ProcessFunction}} provides a {{Context}} object from which 
> on can access the {{TimerService}}. The {{TimerService}} allows to register 
> timers if one operates on a keyed stream. If it is not a keyed stream, then 
> this operation fails. Recently, we added a {{KeyedProcessFunction}} which 
> captures this semantic difference. I would propose to clean up the 
> {{Context}} interface to give only access to supported operations when 
> operating on a non-keyed and keyed state respectively.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11143) AskTimeoutException is thrown during job submission and completion

2018-12-12 Thread Qi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719694#comment-16719694
 ] 

Qi commented on FLINK-11143:


The hard code I’ve found is [1] and [2]. I think they should be changed to 
parameters.  
 
[1] 
[https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187]
[2] 
[https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117]

> AskTimeoutException is thrown during job submission and completion
> --
>
> Key: FLINK-11143
> URL: https://issues.apache.org/jira/browse/FLINK-11143
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.2
>Reporter: Alex Vinnik
>Priority: Major
>
> For more details please see the thread
> [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E]
> On submission 
> 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: 
> Unhandled exception.
>  akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. 
> Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>  at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>  at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>  at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>  at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>  at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>  at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>  at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>  at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>  at java.lang.Thread.run(Thread.java:748)
>  
> On completion
>  
> {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. 
> Sender[null] sent message of type 
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)\nCaused by: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms].

[jira] [Commented] (FLINK-10882) Misleading job/task state for scheduled jobs

2018-12-12 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719670#comment-16719670
 ] 

lining commented on FLINK-10882:


See code, now org.apache.flink.runtime.jobgraph.JobStatus define running which 
means 

Some tasks are scheduled or running, some may be pending, some may be finished. 
Should we add new status?

> Misleading job/task state for scheduled jobs
> 
>
> Key: FLINK-10882
> URL: https://issues.apache.org/jira/browse/FLINK-10882
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
> Attachments: list_view.png, task_view.png
>
>
> When submitting a job when not enough resources are available currently 
> cuases the job  stay in a {{CREATE/SCHEDULED}} state.
> There are 2 issues with how this is presented in the UI.
> The {{Running Jobs}} page incorrectly states that the job is running.
> (see list_view attachment)
> EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING 
> state.
> The state display for individual tasks either
> # States the task is in a CREATED state, when it is actually SCHEDULED
> # States the task is in a CREATED state, but the count for all state boxes is 
> zero.
> (see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10882) Misleading job/task state for scheduled jobs

2018-12-12 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719654#comment-16719654
 ] 

lining commented on FLINK-10882:


Hi [~Zentol], I'm interested in this. Can assign this Jira to me?

> Misleading job/task state for scheduled jobs
> 
>
> Key: FLINK-10882
> URL: https://issues.apache.org/jira/browse/FLINK-10882
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
> Attachments: list_view.png, task_view.png
>
>
> When submitting a job when not enough resources are available currently 
> cuases the job  stay in a {{CREATE/SCHEDULED}} state.
> There are 2 issues with how this is presented in the UI.
> The {{Running Jobs}} page incorrectly states that the job is running.
> (see list_view attachment)
> EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING 
> state.
> The state display for individual tasks either
> # States the task is in a CREATED state, when it is actually SCHEDULED
> # States the task is in a CREATED state, but the count for all state boxes is 
> zero.
> (see task_view attachment)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager

2018-12-12 Thread Sergei Poganshev (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719651#comment-16719651
 ] 

Sergei Poganshev commented on FLINK-11127:
--

Here's a (slightly tricky) workaround that works:
 * configure *metrics.internal.query-service.port* property to some fixed port 
(e.g. **)
 * in taskmanager deployment:
 ** expose that fixed port in taskmanager deployment spec
 ** use init container to put taskmanager's pod ip to flink configuration file:
 *** define a shared _emptyDir_ volume for *both* flink container and init 
container to use that flink container will mount to /etc/flink
 *** init container:
  add environment variable to init container definition that references 
*status.podIP*
  mount flink configuration folder to init container (anywhere other than 
/etc/flink) - this will be used as a basis for changes
  copy files from this folder to /etc/flink
  make init container command append pod ip to flink-conf.yaml as a value 
for *taskmanager.host* property
 *** since /etc/flink is mounted to both containers, changes in init container 
will be visible in flink container

Configured this way jobmanager will connect to taskmanagers IPs directly to a 
configured fixed port.

> Make metrics query service establish connection to JobManager
> -
>
> Key: FLINK-11127
> URL: https://issues.apache.org/jira/browse/FLINK-11127
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Kubernetes, Metrics
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Priority: Major
>
> As part of FLINK-10247, the internal metrics query service has been separated 
> into its own actor system. Before this change, the JobManager (JM) queried 
> TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a 
> separate connection to the TM metrics query service actor.
> In the context of Kubernetes, this is problematic as the JM will typically 
> *not* be able to resolve the TMs by name, resulting in warnings as follows:
> {code}
> 2018-12-11 08:32:33,962 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused 
> by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve]
> {code}
> In order to expose the TMs by name in Kubernetes, users require a service 
> *for each* TM instance which is not practical.
> This currently results in the web UI not being to display some basic metrics 
> about number of sent records. You can reproduce this by following the READMEs 
> in {{flink-container/kubernetes}}.
> This worked before, because the JM is typically exposed via a service with a 
> known name and the TMs establish the connection to it which the metrics query 
> service piggybacked on.
> A potential solution to this might be to let the query service connect to the 
> JM similar to how the TMs register.
> I tagged this ticket as an improvement, but in the context of Kubernetes I 
> would consider this to be a bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7293) Support custom order by in PatternStream

2018-12-12 Thread Dian Fu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-7293.
--

> Support custom order by in PatternStream
> 
>
> Key: FLINK-7293
> URL: https://issues.apache.org/jira/browse/FLINK-7293
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> Currently, when {{ProcessingTime}} is configured, the events are fed to NFA 
> in the order of the arriving time and when {{EventTime}} is configured, the 
> events are fed to NFA in the order of the event time. It should also allow 
> custom {{order by}} to allow users to define the order of the events besides 
> the above factors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java

2018-12-12 Thread GitBox
fhueske commented on issue #7289: [FLINK-11001][table] Fix window rowtime 
attribute can't be renamed bug in Java
URL: https://github.com/apache/flink/pull/7289#issuecomment-446810382
 
 
   Thanks for the fix @hequn8128!
   
   I'll merge this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua edited a comment on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs

2018-12-12 Thread GitBox
yanghua edited a comment on issue #7248: [hotfix][tableApi] Fix the description 
information of intersect in tableApi docs
URL: https://github.com/apache/flink/pull/7248#issuecomment-446808073
 
 
   @fhueske you are right. 
   But there may be some points that can be misleading for some users:
   
   * Sometimes fields of the same name are more easily visually identified as 
having the same type;
   * Multiple test methods in `SetOperatorsITCase` explicitly specify the same 
fields (a, b, c) for the left and right Tables, because the document is just an 
incomplete example, sometimes after the user views the document, they will try 
to search the source code and see real cases in the project;
   * Other operators of multi-table operations in the same document such as 
`union`, `minus` also use the same field name
   
   This is why I said LGTM. But in fact, for those who understand the use of 
this operator, it does not matter whether it is modified or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs

2018-12-12 Thread GitBox
yanghua commented on issue #7248: [hotfix][tableApi] Fix the description 
information of intersect in tableApi docs
URL: https://github.com/apache/flink/pull/7248#issuecomment-446808073
 
 
   @fhueske you are right. 
   But there may be some points that can be misleading for some users:
   
   * Sometimes fields of the same name are more easily visually identified as 
having the same type;
   * Multiple test methods in `SetOperatorsITCase` explicitly specify the same 
fields (a, b, c) for the left and right Tables, because the document is just an 
incomplete example, sometimes after the user views the document, they will try 
to go to the project. Search for source code and see real cases;
   * Other operators of multi-table operations in the same document such as 
`union`, `minus` also use the same field name
   
   This is why I said LGTM. But in fact, for those who understand the use of 
this operator, it does not matter whether it is modified or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

2018-12-12 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-11136:
--
Issue Type: Bug  (was: Test)

> Fix the logical of merge for DISTINCT aggregates
> 
>
> Key: FLINK-11136
> URL: https://issues.apache.org/jira/browse/FLINK-11136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> The logic of merge for DISTINCT aggregates has bug. For the following query:
> {code:java}
> SELECT
>   c,
>   COUNT(DISTINCT b),
>   SUM(DISTINCT b),
>   SESSION_END(rowtime, INTERVAL '0.005' SECOND)
> FROM MyTable
> GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code}
> the following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be 
> cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58)
> at 
> org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50)
> at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33)
> at 
> org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates

2018-12-12 Thread GitBox
fhueske commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of 
DISTINCT aggregates
URL: https://github.com/apache/flink/pull/7284#issuecomment-446804905
 
 
   Thanks for fixing this bug @dianfu!
   
   Will merge this fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-9232) Add harness test for AggregationCodeGenerator

2018-12-12 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong closed FLINK-9232.

Resolution: Duplicate

> Add harness test for AggregationCodeGenerator 
> --
>
> Key: FLINK-9232
> URL: https://issues.apache.org/jira/browse/FLINK-9232
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Instead of relying on ITCase to cover the codegen result. We should have 
> direct test against that, for example using Harness test framework.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #7282: [hotfix][docs] imporve docs of batch overview

2018-12-12 Thread GitBox
fhueske commented on issue #7282: [hotfix][docs] imporve docs of batch overview
URL: https://github.com/apache/flink/pull/7282#issuecomment-446795624
 
 
   Thanks for the PR @KarmaGYZ.
   
   I'll merge it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #7297: [hotfix] [docs] Fix typos in Table and SQL docs

2018-12-12 Thread GitBox
fhueske commented on issue #7297: [hotfix] [docs] Fix typos in Table and SQL 
docs
URL: https://github.com/apache/flink/pull/7297#issuecomment-446791047
 
 
   Thanks for the PR @afedulov.
   
   I'll merge it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs

2018-12-12 Thread GitBox
fhueske commented on issue #7248: [hotfix][tableApi] Fix the description 
information of intersect in tableApi docs
URL: https://github.com/apache/flink/pull/7248#issuecomment-446785876
 
 
   Hi @maqingxiang, thanks for this PR.
   The current documentation is correct. 
   For `intersect` / `intersectAll`, both input tables only need to have the 
same field types but the field names can differ. I'd like to keep the docs as 
they are to make this behavior clear.
   Would you please close this PR? 
   
   Thank you, Fabian.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher

2018-12-12 Thread GitBox
zentol commented on a change in pull request #7295: [FLINK-10748] Decouple 
RestServerEndpoint from Dispatcher
URL: https://github.com/apache/flink/pull/7295#discussion_r241158558
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java
 ##
 @@ -52,25 +46,6 @@
 
public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
 
-   public static CompletableFuture> getRedirectAddress(
-   String localRestAddress,
-   RestfulGateway restfulGateway,
-   Time timeout) {
-
-   return restfulGateway.requestRestAddress(timeout).thenApply(
-   (String remoteRestAddress) -> {
-   if (Objects.equals(localRestAddress, 
remoteRestAddress)) {
-   return Optional.empty();
-   } else {
-   return Optional.of(remoteRestAddress);
-   }
-   });
-   }
-
-   public static HttpResponse getRedirectResponse(String redirectAddress, 
String path) {
-   return getRedirectResponse(redirectAddress, path, 
HttpResponseStatus.TEMPORARY_REDIRECT);
-   }
-
public static HttpResponse getRedirectResponse(String redirectAddress, 
String path, HttpResponseStatus code) {
 
 Review comment:
   I supposed the remaining ones are used for the SSL redirection?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher

2018-12-12 Thread GitBox
zentol commented on a change in pull request #7295: [FLINK-10748] Decouple 
RestServerEndpoint from Dispatcher
URL: https://github.com/apache/flink/pull/7295#discussion_r241160578
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/LeaderRetrievalHandlerTest.java
 ##
 @@ -101,31 +87,14 @@ public void testRedirectHandler() throws Exception {
configuration);
 
try (HttpTestClient httpClient = new 
HttpTestClient("localhost", bootstrap.getServerPort())) {
-   // 1. without completed local address future --> 
Internal server error
+   // 1. no leader gateway available --> Service 
unavailable
httpClient.sendGetRequest(restPath, 
FutureUtils.toFiniteDuration(timeout));
 
HttpTestClient.SimpleHttpResponse response = 
httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
 
-   
Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
response.getStatus());
-
-   // 2. with completed local address future but no leader 
gateway available --> Service unavailable
-   localAddressFuture.complete(correctAddress);
-
-   httpClient.sendGetRequest(restPath, 
FutureUtils.toFiniteDuration(timeout));
-
-   response = 
httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
-

Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, 
response.getStatus());
 
-   // 3. with leader gateway which is not the one of this 
REST endpoints --> Redirection required
-   httpClient.sendGetRequest(restPath, 
FutureUtils.toFiniteDuration(timeout));
-
-   response = 
httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
-
-   
Assert.assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, 
response.getStatus());
-   Assert.assertEquals(expectedRedirection, 
response.getLocation());
-
-   // 4. with local REST endpoint
+   // 2. with leader
 
 Review comment:
   help me out here, why is the leader now suddenly available?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] afedulov opened a new pull request #7297: [hotfix] [docs] Fix typos in Table and SQL docs

2018-12-12 Thread GitBox
afedulov opened a new pull request #7297: [hotfix] [docs] Fix typos in Table 
and SQL docs
URL: https://github.com/apache/flink/pull/7297
 
 
   
   
   ## What is the purpose of the change
   
   Fix typos in Table and SQL docs.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-12 Thread GitBox
fhueske commented on issue #7276: [FLINK-10566] Fix exponential planning time 
of large programs
URL: https://github.com/apache/flink/pull/7276#issuecomment-446699926
 
 
   There was a discussion about a 1.5.6 on the dev ML (started by Till) a few 
days ago and it was concluded that there will be another release.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11145) Fix Hadoop version handling in binary release script

2018-12-12 Thread Thomas Weise (JIRA)
Thomas Weise created FLINK-11145:


 Summary: Fix Hadoop version handling in binary release script
 Key: FLINK-11145
 URL: https://issues.apache.org/jira/browse/FLINK-11145
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Thomas Weise
Assignee: Thomas Weise


It is currently not possible to build the package for a single Hadoop version 
only (even though the logic exists in the script). Also the Hadoop versions we 
build for are repeated and inconsistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on issue #7278: [FLINK-11145] Allow for override of HADOOP_VERSION for single variant binary release build

2018-12-12 Thread GitBox
tweise commented on issue #7278: [FLINK-11145] Allow for override of 
HADOOP_VERSION for single variant binary release build
URL: https://github.com/apache/flink/pull/7278#issuecomment-446677083
 
 
   @zentol PTAL - with fix for the version duplication 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lamber-ken commented on issue #7279: [hotfix] add hadoop user name to yarn logs command

2018-12-12 Thread GitBox
lamber-ken commented on issue #7279: [hotfix] add hadoop user name to yarn logs 
command
URL: https://github.com/apache/flink/pull/7279#issuecomment-446672016
 
 
   hi, @zentol @tillrohrmann, cc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] lamber-ken opened a new pull request #7296: [hotfix][web] fix the desc about restart strategy in web

2018-12-12 Thread GitBox
lamber-ken opened a new pull request #7296: [hotfix][web] fix the desc about 
restart strategy in web
URL: https://github.com/apache/flink/pull/7296
 
 
   ## What is the purpose of the change
   
   int flink job configuration page, it's better use `Restart strategy`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7293: [FLINK-11144][tests] Make tests run on Java 9

2018-12-12 Thread GitBox
zentol commented on a change in pull request #7293: [FLINK-11144][tests] Make 
tests run on Java 9
URL: https://github.com/apache/flink/pull/7293#discussion_r241106082
 
 

 ##
 File path: pom.xml
 ##
 @@ -1602,7 +1616,7 @@ under the License.


org.apache.maven.plugins

maven-shade-plugin
-   3.0.0
+   3.1.1
 
 Review comment:
   please move this into the jdk9 profile; I don't have the time right now to 
verify that the shading still works correctly for all modules.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-12 Thread GitBox
mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of 
large programs
URL: https://github.com/apache/flink/pull/7276#issuecomment-44924
 
 
   Btw, I also want to backport this to 1.5 and 1.6. Do you know if there is 
another 1.5 release planned? Or is 1.5 already end of life with 1.7 being out?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager

2018-12-12 Thread Sergei Poganshev (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719153#comment-16719153
 ] 

Sergei Poganshev commented on FLINK-11127:
--

[~uce] Ah. Thanks for the heads up. We'll probably use StatfulSet just for 
development purposes for now (on a small number of nodes), until the problem is 
fixed.

> Make metrics query service establish connection to JobManager
> -
>
> Key: FLINK-11127
> URL: https://issues.apache.org/jira/browse/FLINK-11127
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Kubernetes, Metrics
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Priority: Major
>
> As part of FLINK-10247, the internal metrics query service has been separated 
> into its own actor system. Before this change, the JobManager (JM) queried 
> TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a 
> separate connection to the TM metrics query service actor.
> In the context of Kubernetes, this is problematic as the JM will typically 
> *not* be able to resolve the TMs by name, resulting in warnings as follows:
> {code}
> 2018-12-11 08:32:33,962 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused 
> by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve]
> {code}
> In order to expose the TMs by name in Kubernetes, users require a service 
> *for each* TM instance which is not practical.
> This currently results in the web UI not being to display some basic metrics 
> about number of sent records. You can reproduce this by following the READMEs 
> in {{flink-container/kubernetes}}.
> This worked before, because the JM is typically exposed via a service with a 
> known name and the TMs establish the connection to it which the metrics query 
> service piggybacked on.
> A potential solution to this might be to let the query service connect to the 
> JM similar to how the TMs register.
> I tagged this ticket as an improvement, but in the context of Kubernetes I 
> would consider this to be a bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10748) Jobmanager in HA setup, redirects (307) don't work when behind a load balancer

2018-12-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10748:
---
Labels: pull-request-available  (was: )

> Jobmanager in HA setup, redirects (307) don't work when behind a load balancer
> --
>
> Key: FLINK-10748
> URL: https://issues.apache.org/jira/browse/FLINK-10748
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2
>Reporter: Jeroen Steggink
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
>
> In a HA Jobmanager setup, connecting to a follower results in a redirect 
> (HTTP/1.1 307 Temporary Redirect) to the leader. However, it redirects to an 
> ip address (why is it not the hostname?) which is in another network and not 
> reachable. I have configured hostnames, not ip addresses.
> Wouldn't it be better to proxy the request to another jobmanager instead of 
> using a redirect? 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher

2018-12-12 Thread GitBox
tillrohrmann opened a new pull request #7295: [FLINK-10748] Decouple 
RestServerEndpoint from Dispatcher
URL: https://github.com/apache/flink/pull/7295
 
 
   ## What is the purpose of the change
   
   This commits decouples the RestServerEndpoint from the Dispatcher by 
converting the
   RedirectHandler into LeaderRetrievalHandler which only retrieves the gateway 
of the
   current leader instead of redirecting the client. This is possible since the 
serving
   RestServerEndpoint no longer needs to be colocated with the Dispatcher and 
JobMaster
   after all RPC requests are serializable (e.g. no direct access on the 
ExecutionGraph
   is needed).
   
   With this change every RestServerEndpoint will automatically proxy to the 
current leader
   without needing to redirect the requesting client.
   
   ## Brief change log
   
   - Remove redirection logic from `RedirectHandler`
   - Rename `RedirectHandler` into `LeaderRetrievalHandler`
   - Remove `restAddressFuture` from all REST handlers except for the 
`JarListHandler`
   
   ## Verifying this change
   
   - Tested manually
   - Adapted former `RedirectHandlerTest` which is now called 
`LeaderRetrievalHandlerTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server

2018-12-12 Thread GitBox
yanghua commented on issue #7263: [FLINK-11081] Support binding port range for 
REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446645970
 
 
   @zentol updated this PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-12 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r241075877
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   hmm, yes, the generic Exception cannot provide more detailed information. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-12 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r241074645
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
 ##
 @@ -37,6 +37,17 @@
.withDeprecatedKeys(WebOptions.ADDRESS.key(), 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS.key())
.withDescription("The address that the server binds 
itself.");
 
+   /**
+* The port range that the server could bind itself to.
+*/
+   public static final ConfigOption BIND_PORT =
+   key("rest.bind-port")
 
 Review comment:
   You are right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10748) Jobmanager in HA setup, redirects (307) don't work when behind a load balancer

2018-12-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-10748:
-

Assignee: Till Rohrmann

> Jobmanager in HA setup, redirects (307) don't work when behind a load balancer
> --
>
> Key: FLINK-10748
> URL: https://issues.apache.org/jira/browse/FLINK-10748
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2
>Reporter: Jeroen Steggink
>Assignee: Till Rohrmann
>Priority: Critical
>
> In a HA Jobmanager setup, connecting to a follower results in a redirect 
> (HTTP/1.1 307 Temporary Redirect) to the leader. However, it redirects to an 
> ip address (why is it not the hostname?) which is in another network and not 
> reachable. I have configured hostnames, not ip addresses.
> Wouldn't it be better to proxy the request to another jobmanager instead of 
> using a redirect? 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10748) Jobmanager in HA setup, redirects (307) don't work when behind a load balancer

2018-12-12 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719118#comment-16719118
 ] 

Till Rohrmann commented on FLINK-10748:
---

Thanks for reporting this issue [~jeroens]. I think you are right that the 
current setup is not ideal. I think by now, the {{RestServerEndpoint}} no 
longer needs to be colocated with the {{Dispatcher}} and the {{JobMaster}} as 
it was the case before. Thus, it should be possible to simply use the retrieved 
leader gateway in the {{RedirectHandler}} and give it to the requested handler 
(also if this means that the communication with the {{Dispatcher}} is a remote 
call) instead of redirecting the client to another {{RestServerEndpoint}}.

> Jobmanager in HA setup, redirects (307) don't work when behind a load balancer
> --
>
> Key: FLINK-10748
> URL: https://issues.apache.org/jira/browse/FLINK-10748
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2
>Reporter: Jeroen Steggink
>Priority: Critical
>
> In a HA Jobmanager setup, connecting to a follower results in a redirect 
> (HTTP/1.1 307 Temporary Redirect) to the leader. However, it redirects to an 
> ip address (why is it not the hostname?) which is in another network and not 
> reachable. I have configured hostnames, not ip addresses.
> Wouldn't it be better to proxy the request to another jobmanager instead of 
> using a redirect? 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11067) Port TableEnvironments to Java

2018-12-12 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719091#comment-16719091
 ] 

Dawid Wysakowicz edited comment on FLINK-11067 at 12/12/18 3:29 PM:


I think as user should only import either {{flink-tabe-api-java}} or 
{{flink-tabe-api-scala}} never both of them we can have:

{code}

// in flink-table-api-java module
package org.apache.flink.table.api;

public final class TableEnvironment {

   static org.apache.flink.table.api.java.BatchTableEnvironment 
getTableEnvironment(BatchEnvironment);

   static org.apache.flink.table.api.java.StreamTableEnvironment 
getTableEnvironment(StreamEnvironment);
   
   private TableEnvironment() {
   }
}

// in flink-table-api-scala module
package org.apache.flink.table.api;

object TableEnvironment {
   def getTableEnvironment(BatchEnvironment): 
org.apache.flink.table.api.scala.BatchTableEnvironment = ...

   def getTableEnvironment(StreamEnvironment): 
org.apache.flink.table.api.scala.StreamTableEnvironment = 
}

{code}

The rest would remain the same as in [~twalthr] suggestion. I think that would 
retain backwards compatibility.


was (Author: dawidwys):
I think as user should only import either {{flink-tabe-api-java}} or 
{{flink-tabe-api-scala}} never both of them. I think we can have:

{code}

// in flink-table-api-java module
package org.apache.flink.table.api;

public final class TableEnvironment {

   static org.apache.flink.table.api.java.BatchTableEnvironment 
getTableEnvironment(BatchEnvironment);

   static org.apache.flink.table.api.java.StreamTableEnvironment 
getTableEnvironment(StreamEnvironment);
   
   private TableEnvironment() {
   }
}

// in flink-table-api-scala module
package org.apache.flink.table.api;

object TableEnvironment {
   def getTableEnvironment(BatchEnvironment): 
org.apache.flink.table.api.scala.BatchTableEnvironment = ...

   def getTableEnvironment(StreamEnvironment): 
org.apache.flink.table.api.scala.StreamTableEnvironment = 
}

{code}

The rest would remain the same as in [~twalthr] suggestion. I think that would 
retain backwards compatibility.

> Port TableEnvironments to Java
> --
>
> Key: FLINK-11067
> URL: https://issues.apache.org/jira/browse/FLINK-11067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, 
> {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided 
> and discussed. Some refactoring and clean up might be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java

2018-12-12 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719091#comment-16719091
 ] 

Dawid Wysakowicz commented on FLINK-11067:
--

I think as user should only import either {{flink-tabe-api-java}} or 
{{flink-tabe-api-scala}} never both of them. I think we can have:

{code}

// in flink-table-api-java module
package org.apache.flink.table.api;

public final class TableEnvironment {

   static org.apache.flink.table.api.java.BatchTableEnvironment 
getTableEnvironment(BatchEnvironment);

   static org.apache.flink.table.api.java.StreamTableEnvironment 
getTableEnvironment(StreamEnvironment);
   
   private TableEnvironment() {
   }
}

// in flink-table-api-scala module
package org.apache.flink.table.api;

object TableEnvironment {
   def getTableEnvironment(BatchEnvironment): 
org.apache.flink.table.api.scala.BatchTableEnvironment = ...

   def getTableEnvironment(StreamEnvironment): 
org.apache.flink.table.api.scala.StreamTableEnvironment = 
}

{code}

The rest would remain the same as in [~twalthr] suggestion. I think that would 
retain backwards compatibility.

> Port TableEnvironments to Java
> --
>
> Key: FLINK-11067
> URL: https://issues.apache.org/jira/browse/FLINK-11067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, 
> {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided 
> and discussed. Some refactoring and clean up might be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials

2018-12-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11126:
---
Labels: pull-request-available  (was: )

> Filter out AMRMToken in the TaskManager credentials
> ---
>
> Key: FLINK-11126
> URL: https://issues.apache.org/jira/browse/FLINK-11126
> Project: Flink
>  Issue Type: Improvement
>  Components: Security, YARN
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, Flink JobManager propagates its storage tokens to TaskManager to 
> meet the requirement of YARN log aggregation (see FLINK-6376). But in this 
> way the AMRMToken is also included in the TaskManager credentials, which 
> could be potentially insecure. We should filter out AMRMToken before setting 
> the tokens to TaskManager's container launch context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 opened a new pull request #7294: [FLINK-11126][YARN][security] Filter out AMRMToken in the TaskManager credentials

2018-12-12 Thread GitBox
link3280 opened a new pull request #7294: [FLINK-11126][YARN][security] Filter 
out AMRMToken in the TaskManager credentials
URL: https://github.com/apache/flink/pull/7294
 
 
   ## What is the purpose of the change
   
   Currently, Flink JobManager propagates its storage tokens to TaskManager to 
meet the requirement of YARN log aggregation (see FLINK-6376). But in this way 
the AMRMToken is also included in the TaskManager credentials, which could be 
potentially insecure. 
   
   The PR filters out AMRMToken before setting the tokens to TaskManager's 
container launch context, and adds checks for delegation tokens that JobManager 
prepares for TaskManagers.
   
   ## Brief change log
   
 - *Filter out AMRMToken before setting the tokens to the TaskManager 
container context.*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Extended YarnApplicationMasterRunnerTest to check delegation tokens in 
the TaskManager executor context.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11144) Run Tests on Java 9

2018-12-12 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-11144:
-
Description: 
When using Java 9, tests should run when invoking {{mvn clean install -Dfast}}

Acceptance criteria:
* Project compiles with {{mvn clean install -Dfast -DskipTests}}
* Tests are runnable

  was:When using Java 9, tests should run when invoking {{mvn clean install 
-Dfast}}


> Run Tests on Java 9
> ---
>
> Key: FLINK-11144
> URL: https://issues.apache.org/jira/browse/FLINK-11144
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When using Java 9, tests should run when invoking {{mvn clean install -Dfast}}
> Acceptance criteria:
> * Project compiles with {{mvn clean install -Dfast -DskipTests}}
> * Tests are runnable



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11144) Run Tests on Java 9

2018-12-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11144:
---
Labels: pull-request-available  (was: )

> Run Tests on Java 9
> ---
>
> Key: FLINK-11144
> URL: https://issues.apache.org/jira/browse/FLINK-11144
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> When using Java 9, tests should run when invoking {{mvn clean install -Dfast}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL opened a new pull request #7293: [FLINK-11144][tests] Make tests run on Java 9

2018-12-12 Thread GitBox
GJL opened a new pull request #7293: [FLINK-11144][tests] Make tests run on 
Java 9
URL: https://github.com/apache/flink/pull/7293
 
 
   ## What is the purpose of the change
   
   *Make tests run on Java 9 by upgrading plugins to minimum supported version 
for Java 9: https://cwiki.apache.org/confluence/display/MAVEN/Java+9+-+Jigsaw*
   
   cc: @zentol 
   
   
   ## Brief change log
   
 - *Upgrade surefire plugin.*
 - *Upgrade shade plugin.*
 - *Upgrade compiler plugin.*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *all tests*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-11085) NoClassDefFoundError in presto-s3 filesystem

2018-12-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11085.
---
Resolution: Fixed

Fixed via
1.8.0: 
https://github.com/apache/flink/commit/ea8373e05048ad9dc2a42a2f8d10a9049559e975
1.7.1: 
https://github.com/apache/flink/commit/4ab682bd76ecaa5fef67b077880665b5d86b03c2

> NoClassDefFoundError in presto-s3 filesystem
> 
>
> Key: FLINK-11085
> URL: https://issues.apache.org/jira/browse/FLINK-11085
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
> Attachments: image-2018-12-07-13-04-06-563.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> A user has reporter an issue on the ML where using the presto-s3 filesystem 
> fails with an exception due to a missing class. The missing class is indeed 
> filtered out in the shade-plugin configuration.
> {code:java}
> java.lang.NoClassDefFoundError: 
> org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus
>   at 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446)
>   at 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423)
>   at 
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80)
>   at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250)
>   at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
>   at 
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #7261: [FLINK-11085][s3] Fix inclusion of presto hadoop classes

2018-12-12 Thread GitBox
tillrohrmann closed pull request #7261: [FLINK-11085][s3] Fix inclusion of 
presto hadoop classes
URL: https://github.com/apache/flink/pull/7261
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 7d19fac97e3..9a5d46e22aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1478,7 +1478,8 @@ under the License.

false

true

${project.basedir}/target/dependency-reduced-pom.xml
-   
+   
+   



*


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11028) Disable deployment of flink-fs-tests

2018-12-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719029#comment-16719029
 ] 

Chesnay Schepler commented on FLINK-11028:
--

eh maybe not :D the tests required flink-streaming-java, which requires 
flink-runtime, which requires flink-hadoop-fs for testing, resulting in a 
cyclic dependency.

Skipping the deployment can be our last-minute backup plan if no one picks this 
issue up until the next feature freeze.

> Disable deployment of flink-fs-tests
> 
>
> Key: FLINK-11028
> URL: https://issues.apache.org/jira/browse/FLINK-11028
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.8.0
>
>
> The {{flink-fs-tests}} module only contains tests, without any production 
> code or test utilities. As such there should never be a use-case where one 
> would specify a dependency on this module, hence we could skip the deployment 
> entirely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6880: [FLINK-10571][storm] Remove topology support

2018-12-12 Thread GitBox
zentol commented on issue #6880: [FLINK-10571][storm] Remove topology support
URL: https://github.com/apache/flink/pull/6880#issuecomment-446609832
 
 
   @TisonKun Someone has to review the PR to check that only topology-related 
parts have been removed, without impacting the bolt/spout wrapper functionality.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-12 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r241037567
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -283,6 +275,14 @@ public JobExecutionResult execute(String jobName) throws 
ProgramInvocationExcept
return executeRemotely(streamGraph, jarFiles);
}
 
+   /**
+* Execute the job with savepoint restore.
+*/
+   public JobExecutionResult execute(String jobName, 
SavepointRestoreSettings savepointRestoreSettings) throws 
ProgramInvocationException {
+   this.savepointRestoreSettings = savepointRestoreSettings;
+   return execute(jobName);
 
 Review comment:
   I don't want to repeat what is already done in `execute` and 
`executeRemotely`. The member variable is an implementation detail that does 
not concern the interface contract and, as mentioned, all other execution 
parameters are already derived from the instance. (BTW execute is a one-time 
operation with side effect - it clears the transformations.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11028) Disable deployment of flink-fs-tests

2018-12-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719024#comment-16719024
 ] 

Chesnay Schepler commented on FLINK-11028:
--

The contained test are exclusively for HDFS, so this might be as easy as 
copying them to {{flink-hadoop-fs}} and adding a few test dependencies. Running 
this approach on travis: https://travis-ci.org/zentol/flink/builds/467022202

> Disable deployment of flink-fs-tests
> 
>
> Key: FLINK-11028
> URL: https://issues.apache.org/jira/browse/FLINK-11028
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.8.0
>
>
> The {{flink-fs-tests}} module only contains tests, without any production 
> code or test utilities. As such there should never be a use-case where one 
> would specify a dependency on this module, hence we could skip the deployment 
> entirely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11101) Ineffective openjdk exclusion Presto S3 FileSystem module

2018-12-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11101:
---
Labels: pull-request-available  (was: )

> Ineffective openjdk exclusion Presto S3 FileSystem module
> -
>
> Key: FLINK-11101
> URL: https://issues.apache.org/jira/browse/FLINK-11101
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, FileSystem
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> The {{presto-s3-fs}} module defines he following exclusion in the 
> shade-plugin:
> {code:java}
> 
>org.openjdk.jol
> {code}
> This exclusion has no effect on the resulting artifact. We could think about 
> removing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol opened a new pull request #7292: [FLINK-11101][S3] Ban openjdk.jol dependencies

2018-12-12 Thread GitBox
zentol opened a new pull request #7292: [FLINK-11101][S3] Ban openjdk.jol 
dependencies
URL: https://github.com/apache/flink/pull/7292
 
 
   ## What is the purpose of the change
   
   This PR removes a redundant exclusion for `openjdk.jol` from the 
presto-s3-filesystem pom, and adds a safeguard via the `maven-enforcer-plugin` 
to prevent this dependency from showing up in the dependency tree again. This 
guarantees that the dependency section properly excludes this dependency.
   
   This is a better approach than relying on the shade-plugin configuration 
since the dependency could still show up in the dependency tree of the project 
(if the dependency section does note exclude it correctly), which could ring 
alarm-bells during release testing for no reason.
   
   ## Verifying this change
   
   You can manually verify the effectiveness of the enforcer by removing the 
`openjdk.jol` exclusion from `presto-hive` and running `mvn-validate`.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >