[jira] [Commented] (FLINK-10143) date_format throws CodeGenException
[ https://issues.apache.org/jira/browse/FLINK-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719879#comment-16719879 ] vinoyang commented on FLINK-10143: -- Hi [~twalthr] We also encountered this problem and then found this issue. I want to know if we can improve its priority. Or temporarily remove this function from the official documentation or give a description of the problem. Otherwise it will cause trouble to the user. > date_format throws CodeGenException > --- > > Key: FLINK-10143 > URL: https://issues.apache.org/jira/browse/FLINK-10143 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-08-14-18-11-40-144.png > > > "insert into mtmp_sink SELECT DATE_FORMAT(time_str2tms,'%Y%d%m') FROM > mtmp_source"; > We have recently used the date_format method like the above sql, but found > that this method does not work. > !image-2018-08-14-18-11-40-144.png! > error msg is : > org.apache.flink.table.codegen.CodeGenException: Incompatible types of > expression and result type. Expression[GeneratedExpression(result$6,isNull$7, > java.lang.String result$4 = "%Y, %d %M"; > boolean isNull$7 = isNull$3 || false; > java.lang.String result$6 = ""; > if (!isNull$7) { > > result$6 = dateFormatter$5.print(result$2); > > isNull$7 = (result$6 == null); > > } > ,String,false)] type is [String], result type is [Timestamp] > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:380) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$generateResultExpression$2.apply(CodeGenerator.scala:378) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:378) > at > org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:336) > at > org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:45) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:999) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:926) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:372) > at > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786) > at > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723) > at > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241286479 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.dataview._ +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.functions.UserDefinedAggregateFunction +import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.utils.RetractableCollector +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.table.codegen.TableAggregationCodeGenerator._ +import org.apache.flink.table.plan.schema.RowSchema + +/** + * A base code generator for generating [[GeneratedTableAggregations]]. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the input of the Function + * @param constants constant expressions that act like a second input in the + * parameter indices. + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class + * identifier. + * @param physicalInputTypes Physical input row types + * @param tableAggOutputTypesOutput types of TableAggregateFunction + * @param outputSchema The type of the rows emitted by TableAggregate operator + * @param aggregates All aggregate functions + * @param aggFields Indexes of the input fields for all aggregate functions + * @param aggMapping The mapping of aggregates to output fields + * @param isDistinctAggs The flag array indicating whether it is distinct aggregate. + * @param isStateBackedDataViews a flag to indicate if distinct filter uses state backend. + * @param partialResults A flag defining whether final or partial results (accumulators) + * are set + * to the output row. + * @param fwdMapping The mapping of input fields to output fields + * @param mergeMapping An optional mapping to specify the accumulators to merge. If not + * set, we + * assume that both rows have the accumulators at the same position. + * @param outputArityThe number of fields in the output row. + * @param needRetracta flag to indicate if the aggregate needs the retract method + * @param needEmitWithRetracta flag to indicate if the aggregate needs to output retractions + * when update + * @param needMerge a flag to indicate if the aggregate needs the merge method + * @param needReset a flag to indicate if the aggregate needs the resetAccumulator + * method + * @param accConfig Data view specification for accumulators + */ +class TableAggregationCodeGenerator( +config: TableConfig, +nullableInput: Boolean, +input: TypeInformation[_ <: Any], +constants: Option[Seq[RexLiteral]], +name: String, +physicalInputTypes: Seq[TypeInformation[_]], +tableAggOutputTypes: TypeInformation[_], +outputSchema: RowSchema, +aggregates: Array[UserDefinedAggregateFunction[_ <: Any, _ <: Any]], +aggFields: Array[Array[Int]], +aggMapping: Array[Int], +isDistinctAggs: Array[Boolean], +isStateBackedDataViews: Boolean, +partialResults: Boolean, +fwdMapping: Array[Int]
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241279604 ## File path: docs/dev/table/tableApi.md ## @@ -550,6 +550,82 @@ val result = orders.distinct() {% top %} +### TableAggregations + + + + + + + + Operators + Description + + + + + +GroupBy TableAggregation +Streaming +Result Updating + + +Similar to a GroupBy Aggregation. Groups the rows on the grouping keys with a following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flatAggregate" with a select statement. And the select statement does not support * and aggregate functions. Review comment: IMO, the select statement can support * This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241281902 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - emitValue or emitValueWithRetract. + * + * There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator. + * + * All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} are defined in + * the {@link TableAggregateFunction} functions, while other methods are explained below. + * + * + * {@code + * Processes the input values and update the provided accumulator instance. The method + * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction + * requires at least one accumulate() method. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void accumulate(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Retracts the input values from the accumulator instance. The current design assumes the + * inputs are the values that have been previously accumulated. The method retract can be + * overloaded with different custom types and arguments. This function must be implemented for + * data stream bounded OVER aggregates. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void retract(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Merges a group of accumulator instances into one accumulator instance. This function must be + * implemented for data stream session window grouping aggregates and data set grouping aggregates. + * + * param: accumulator the accumulator which will keep the merged aggregate results. It should + *be noted that the accumulator may contain the previous aggregated + *results. Therefore user should not replace or clean this instance in the + *custom merge method. + * param: its an java.lang.Iterable pointed to a group of accumulators that will be + *merged. + * + * public void merge(ACC accumulator, java.lang.Iterable iterable) + * } + * + * + * + * {@code + * Resets the accumulator for this AggregateFunction. This function must be implemented for + * data set grouping aggregates. + * + * param: accumulator the accumulator which needs to be reset + * + * public void resetAccumulator(ACC accumulator) + * } + * + * + * + * {@code + * Output data incrementally in upsert or append mode. For example, if we emit data for a TopN + * TableAggregateFunction, we don't have to output all top N elements each time a record comes. + * It is more efficient to output data incrementally in upsert mode, i.e, only output data whose + * rank has been changed. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: out the collector used to output data. + * + * public void emitValue(ACC accumulator, RetractableCollector out) + * } + * + * + * + * {@code + * Output data incrementally in retract mode. Once there is an update, we have to retract old + * records before send new updated ones. Review comment: send new updat
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241293061 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableAggregateTest.scala ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.Func0 +import org.apache.flink.table.utils.{EmptyTableAggFunc, TableTestBase, TopN} +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class TableAggregateTest extends TableTestBase { + Review comment: Add an alias test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241280067 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom Review comment: an -> a This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241281516 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - emitValue or emitValueWithRetract. + * + * There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator. + * + * All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} are defined in + * the {@link TableAggregateFunction} functions, while other methods are explained below. + * + * + * {@code + * Processes the input values and update the provided accumulator instance. The method + * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction + * requires at least one accumulate() method. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void accumulate(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Retracts the input values from the accumulator instance. The current design assumes the + * inputs are the values that have been previously accumulated. The method retract can be + * overloaded with different custom types and arguments. This function must be implemented for + * data stream bounded OVER aggregates. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void retract(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Merges a group of accumulator instances into one accumulator instance. This function must be + * implemented for data stream session window grouping aggregates and data set grouping aggregates. + * + * param: accumulator the accumulator which will keep the merged aggregate results. It should + *be noted that the accumulator may contain the previous aggregated + *results. Therefore user should not replace or clean this instance in the + *custom merge method. + * param: its an java.lang.Iterable pointed to a group of accumulators that will be + *merged. + * + * public void merge(ACC accumulator, java.lang.Iterable iterable) + * } + * + * + * + * {@code + * Resets the accumulator for this AggregateFunction. This function must be implemented for Review comment: AggregateFunction -> TableAggregateFunction This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241285918 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ## @@ -295,6 +295,15 @@ abstract class CodeGenerator( generateResultExpression(input1AccessExprs ++ input2AccessExprs, returnType, resultFieldNames) } + /** +* Generates an expression from the input. +*/ + def generateFieldAccessExprs: Seq[GeneratedExpression] = { Review comment: Is it possible to avoid this change? Seems that you can use CodeGenerator#generateConverterResultExpression This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241286801 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.dataview._ +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.functions.UserDefinedAggregateFunction +import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.utils.RetractableCollector +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.table.codegen.TableAggregationCodeGenerator._ +import org.apache.flink.table.plan.schema.RowSchema + +/** + * A base code generator for generating [[GeneratedTableAggregations]]. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the input of the Function + * @param constants constant expressions that act like a second input in the + * parameter indices. + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class + * identifier. + * @param physicalInputTypes Physical input row types + * @param tableAggOutputTypesOutput types of TableAggregateFunction + * @param outputSchema The type of the rows emitted by TableAggregate operator + * @param aggregates All aggregate functions + * @param aggFields Indexes of the input fields for all aggregate functions + * @param aggMapping The mapping of aggregates to output fields + * @param isDistinctAggs The flag array indicating whether it is distinct aggregate. + * @param isStateBackedDataViews a flag to indicate if distinct filter uses state backend. + * @param partialResults A flag defining whether final or partial results (accumulators) + * are set + * to the output row. + * @param fwdMapping The mapping of input fields to output fields + * @param mergeMapping An optional mapping to specify the accumulators to merge. If not + * set, we + * assume that both rows have the accumulators at the same position. + * @param outputArityThe number of fields in the output row. + * @param needRetracta flag to indicate if the aggregate needs the retract method + * @param needEmitWithRetracta flag to indicate if the aggregate needs to output retractions + * when update + * @param needMerge a flag to indicate if the aggregate needs the merge method + * @param needReset a flag to indicate if the aggregate needs the resetAccumulator + * method + * @param accConfig Data view specification for accumulators + */ +class TableAggregationCodeGenerator( +config: TableConfig, +nullableInput: Boolean, +input: TypeInformation[_ <: Any], +constants: Option[Seq[RexLiteral]], +name: String, +physicalInputTypes: Seq[TypeInformation[_]], +tableAggOutputTypes: TypeInformation[_], +outputSchema: RowSchema, +aggregates: Array[UserDefinedAggregateFunction[_ <: Any, _ <: Any]], +aggFields: Array[Array[Int]], +aggMapping: Array[Int], +isDistinctAggs: Array[Boolean], +isStateBackedDataViews: Boolean, +partialResults: Boolean, +fwdMapping: Array[Int]
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241280390 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - emitValue or emitValueWithRetract. + * + * There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator. + * + * All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} are defined in Review comment: methods -> method This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241291444 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTableAggregate.scala ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.functions.utils.TableAggSqlFunction +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.util.Logging + +/** + * + * Flink RelNode for data stream unbounded table aggregate + * + * @param cluster Cluster of the RelNode, represent for an environment of related + *relational expressions during the optimization of a query. + * @param traitSetTrait set of the RelNode + * @param inputNode The input RelNode of aggregation + * @param schema The type of the rows emitted by this RelNode + * @param inputSchema The type of the rows consumed by this RelNode + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param groupings The position (in the input Row) of the grouping keys + */ +class DataStreamTableAggregate( Review comment: is it would be better to use DataStreamGroupTableAggregate? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241281679 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - emitValue or emitValueWithRetract. + * + * There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator. + * + * All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} are defined in + * the {@link TableAggregateFunction} functions, while other methods are explained below. + * + * + * {@code + * Processes the input values and update the provided accumulator instance. The method + * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction + * requires at least one accumulate() method. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void accumulate(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Retracts the input values from the accumulator instance. The current design assumes the + * inputs are the values that have been previously accumulated. The method retract can be + * overloaded with different custom types and arguments. This function must be implemented for + * data stream bounded OVER aggregates. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void retract(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Merges a group of accumulator instances into one accumulator instance. This function must be + * implemented for data stream session window grouping aggregates and data set grouping aggregates. + * + * param: accumulator the accumulator which will keep the merged aggregate results. It should + *be noted that the accumulator may contain the previous aggregated + *results. Therefore user should not replace or clean this instance in the + *custom merge method. + * param: its an java.lang.Iterable pointed to a group of accumulators that will be + *merged. + * + * public void merge(ACC accumulator, java.lang.Iterable iterable) + * } + * + * + * + * {@code + * Resets the accumulator for this AggregateFunction. This function must be implemented for + * data set grouping aggregates. + * + * param: accumulator the accumulator which needs to be reset + * + * public void resetAccumulator(ACC accumulator) + * } + * + * + * + * {@code + * Output data incrementally in upsert or append mode. For example, if we emit data for a TopN + * TableAggregateFunction, we don't have to output all top N elements each time a record comes. Review comment: have to -> need This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241289780 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTableAggregate.scala ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.functions.utils.TableAggSqlFunction +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.util.Logging + +/** + * + * Flink RelNode for data stream unbounded table aggregate + * + * @param cluster Cluster of the RelNode, represent for an environment of related + *relational expressions during the optimization of a query. + * @param traitSetTrait set of the RelNode + * @param inputNode The input RelNode of aggregation + * @param schema The type of the rows emitted by this RelNode + * @param inputSchema The type of the rows consumed by this RelNode + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param groupings The position (in the input Row) of the grouping keys + */ +class DataStreamTableAggregate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +schema: RowSchema, +inputSchema: RowSchema, +val namedAggregates: Seq[CalcitePair[AggregateCall, String]], +val groupings: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) +with CommonAggregate +with DataStreamRel +with Logging { + + override def deriveRowType() = schema.relDataType + + override def needsUpdatesAsRetraction = true + + override def producesUpdates = true + + override def consumesRetractions = true + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamTableAggregate( + cluster, + traitSet, + inputs.get(0), + schema, + inputSchema, + namedAggregates, + groupings) + } + + override def toString: String = { +s"TableAggregate(${ + if (!groupings.isEmpty) { +s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), " + } else { +"" + } +} flatAggregate:(${aggregationToString( + inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString( +inputSchema.relDataType, groupings), !groupings.isEmpty) + .item("flatAggregate", aggregationToString( +inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) +val aggCall = namedAggregates(0).left + +val processFunction = AggregateUtil.createTableAggregateFunction( + tableEnv.getConfig, + false, + inputSchema.typeInfo, + None, + namedAggregates, + inputSchema.relDataType, + inputSchema.fieldTypeInfos, +
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241293985 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableAggregateITCase.scala ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, Types} +import org.apache.flink.table.runtime.utils._ +import org.apache.flink.table.utils.{Top3WithRetractInput, TopN} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.mutable + +/** + * Tests of groupby (without window) aggregations + */ +class TableAggregateITCase extends StreamingWithStateTestBase { Review comment: Add a distinct ITCase This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241287190 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala ## @@ -71,6 +71,16 @@ case class GeneratedAggregationsFunction( name: String, code: String) +/** + * Describes a generated table aggregate helper function + * + * @param name class name of the generated Function. + * @param code code of the generated Function. + */ +case class GeneratedTableAggregationsFunction( Review comment: Maybe we can use GeneratedAggregationsFunction? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241290354 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.dataview._ +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.functions.UserDefinedAggregateFunction +import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.utils.RetractableCollector +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.table.codegen.TableAggregationCodeGenerator._ +import org.apache.flink.table.plan.schema.RowSchema + +/** + * A base code generator for generating [[GeneratedTableAggregations]]. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the input of the Function + * @param constants constant expressions that act like a second input in the + * parameter indices. + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class + * identifier. + * @param physicalInputTypes Physical input row types + * @param tableAggOutputTypesOutput types of TableAggregateFunction + * @param outputSchema The type of the rows emitted by TableAggregate operator + * @param aggregates All aggregate functions + * @param aggFields Indexes of the input fields for all aggregate functions + * @param aggMapping The mapping of aggregates to output fields + * @param isDistinctAggs The flag array indicating whether it is distinct aggregate. + * @param isStateBackedDataViews a flag to indicate if distinct filter uses state backend. + * @param partialResults A flag defining whether final or partial results (accumulators) + * are set + * to the output row. + * @param fwdMapping The mapping of input fields to output fields + * @param mergeMapping An optional mapping to specify the accumulators to merge. If not + * set, we + * assume that both rows have the accumulators at the same position. + * @param outputArityThe number of fields in the output row. + * @param needRetracta flag to indicate if the aggregate needs the retract method + * @param needEmitWithRetracta flag to indicate if the aggregate needs to output retractions + * when update + * @param needMerge a flag to indicate if the aggregate needs the merge method + * @param needReset a flag to indicate if the aggregate needs the resetAccumulator + * method + * @param accConfig Data view specification for accumulators + */ +class TableAggregationCodeGenerator( +config: TableConfig, +nullableInput: Boolean, +input: TypeInformation[_ <: Any], +constants: Option[Seq[RexLiteral]], +name: String, +physicalInputTypes: Seq[TypeInformation[_]], +tableAggOutputTypes: TypeInformation[_], +outputSchema: RowSchema, Review comment: tableAggOutputTypes can be computed from outputSchema This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241279917 ## File path: docs/dev/table/tableApi.md ## @@ -550,6 +550,82 @@ val result = orders.distinct() {% top %} +### TableAggregations + + + + + + + + Operators + Description + + + + + +GroupBy TableAggregation +Streaming +Result Updating + + +Similar to a GroupBy Aggregation. Groups the rows on the grouping keys with a following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flatAggregate" with a select statement. And the select statement does not support * and aggregate functions. +{% highlight java %} +TableAggregateFunction tableAggFunc = new MyTableAggregateFunction(); +tableEnv.registerFunction("myTableAggFunc", tableAggFunc); +Table orders = tableEnv.scan("Orders"); +Table result = orders +.groupBy("a") +.flatAggregate("myTableAggFunc(a, b, c)") +.select("_1 as a, _2 as b"); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details. + + + + + + + + + + + + + Operators + Description + + + + + + +GroupBy TableAggregation +Streaming +Result Updating + + +Similar to a GroupBy Aggregation. Groups the rows on the grouping keys with a following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flatAggregate" with a select statement. And the select statement does not support * and aggregate functions. Review comment: IMO, the select statement can support * This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241284274 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -358,8 +358,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => +throw new TableException( + "UpsertStreamTableSink requires that Table has full primary keys if it is updated. " + +"You can enforce your sink keys by override enforceKeyFields method in " + +"UpsertStreamTableSink interface if you really want to, but be carefull.") Review comment: carefull -> careful This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241286060 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/TableAggregationCodeGenerator.scala ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.dataview._ +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.functions.UserDefinedAggregateFunction +import org.apache.flink.table.runtime.aggregate.GeneratedTableAggregations +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.utils.RetractableCollector +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.table.codegen.TableAggregationCodeGenerator._ +import org.apache.flink.table.plan.schema.RowSchema + +/** + * A base code generator for generating [[GeneratedTableAggregations]]. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the input of the Function + * @param constants constant expressions that act like a second input in the + * parameter indices. + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class + * identifier. + * @param physicalInputTypes Physical input row types + * @param tableAggOutputTypesOutput types of TableAggregateFunction + * @param outputSchema The type of the rows emitted by TableAggregate operator + * @param aggregates All aggregate functions + * @param aggFields Indexes of the input fields for all aggregate functions + * @param aggMapping The mapping of aggregates to output fields + * @param isDistinctAggs The flag array indicating whether it is distinct aggregate. + * @param isStateBackedDataViews a flag to indicate if distinct filter uses state backend. + * @param partialResults A flag defining whether final or partial results (accumulators) + * are set + * to the output row. + * @param fwdMapping The mapping of input fields to output fields + * @param mergeMapping An optional mapping to specify the accumulators to merge. If not + * set, we + * assume that both rows have the accumulators at the same position. + * @param outputArityThe number of fields in the output row. + * @param needRetracta flag to indicate if the aggregate needs the retract method + * @param needEmitWithRetracta flag to indicate if the aggregate needs to output retractions + * when update + * @param needMerge a flag to indicate if the aggregate needs the merge method + * @param needReset a flag to indicate if the aggregate needs the resetAccumulator + * method + * @param accConfig Data view specification for accumulators + */ +class TableAggregationCodeGenerator( +config: TableConfig, +nullableInput: Boolean, +input: TypeInformation[_ <: Any], +constants: Option[Seq[RexLiteral]], +name: String, +physicalInputTypes: Seq[TypeInformation[_]], +tableAggOutputTypes: TypeInformation[_], +outputSchema: RowSchema, +aggregates: Array[UserDefinedAggregateFunction[_ <: Any, _ <: Any]], +aggFields: Array[Array[Int]], +aggMapping: Array[Int], +isDistinctAggs: Array[Boolean], +isStateBackedDataViews: Boolean, +partialResults: Boolean, +fwdMapping: Array[Int]
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241281005 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - emitValue or emitValueWithRetract. + * + * There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator. + * + * All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} are defined in + * the {@link TableAggregateFunction} functions, while other methods are explained below. + * + * + * {@code + * Processes the input values and update the provided accumulator instance. The method + * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction + * requires at least one accumulate() method. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void accumulate(ACC accumulator, [user defined inputs]) + * } + * + * + * + * {@code + * Retracts the input values from the accumulator instance. The current design assumes the + * inputs are the values that have been previously accumulated. The method retract can be + * overloaded with different custom types and arguments. This function must be implemented for + * data stream bounded OVER aggregates. Review comment: TableAggregateFunction is not supported in OVER. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241280457 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: + * - createAccumulator, + * - accumulate, and + * - emitValue or emitValueWithRetract. + * + * There are a few other methods that can be optional to have: + * - retract, + * - merge, and + * - resetAccumulator. + * + * All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} are defined in + * the {@link TableAggregateFunction} functions, while other methods are explained below. + * Review comment: TableAggregateFunction -> UserDefinedAggregateFunction. The comments in AggregateFunction should also be updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241289558 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTableAggregate.scala ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.functions.utils.TableAggSqlFunction +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.util.Logging + +/** + * + * Flink RelNode for data stream unbounded table aggregate + * + * @param cluster Cluster of the RelNode, represent for an environment of related + *relational expressions during the optimization of a query. + * @param traitSetTrait set of the RelNode + * @param inputNode The input RelNode of aggregation + * @param schema The type of the rows emitted by this RelNode + * @param inputSchema The type of the rows consumed by this RelNode + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param groupings The position (in the input Row) of the grouping keys + */ +class DataStreamTableAggregate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +schema: RowSchema, +inputSchema: RowSchema, +val namedAggregates: Seq[CalcitePair[AggregateCall, String]], +val groupings: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) +with CommonAggregate +with DataStreamRel +with Logging { + + override def deriveRowType() = schema.relDataType + + override def needsUpdatesAsRetraction = true + + override def producesUpdates = true + + override def consumesRetractions = true + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamTableAggregate( + cluster, + traitSet, + inputs.get(0), + schema, + inputSchema, + namedAggregates, + groupings) + } + + override def toString: String = { +s"TableAggregate(${ + if (!groupings.isEmpty) { +s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), " + } else { +"" + } +} flatAggregate:(${aggregationToString( + inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString( +inputSchema.relDataType, groupings), !groupings.isEmpty) + .item("flatAggregate", aggregationToString( +inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + Review comment: can add some state retention warning like done in DataStreamGroupAggregate This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Servi
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241283978 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/RetractableCollector.java ## @@ -16,21 +16,29 @@ * limitations under the License. */ -package org.apache.flink.table.sinks +package org.apache.flink.table.utils; -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.Table +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Collector; /** - * Defines an external [[TableSink]] to emit streaming [[Table]] with only insert changes. - * - * If the [[Table]] is also modified by update or delete changes, a - * [[org.apache.flink.table.api.TableException]] will be thrown. - * - * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. - */ -trait AppendStreamTableSink[T] extends StreamTableSink[T] { + * Collects a record and forwards it. The collector can output retract messages with retract method. + * Currently, RetractableCollector is used in TableAggregateFunction. Review comment: RetractableCollector can only be used in TableAggregateFunction. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API
dianfu commented on a change in pull request #7209: [FLINK-10977][table] Add UnBounded FlatAggregate operator to streaming Table API URL: https://github.com/apache/flink/pull/7209#discussion_r241280112 ## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base class for user-defined table aggregates. + * + * The behavior of an {@link TableAggregateFunction} can be defined by implementing a series of custom + * methods. An {@link TableAggregateFunction} needs at least three methods: Review comment: An -> A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on issue #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on issue #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#issuecomment-446865059 Thanks for your reviews! @pnowojski I would consider your above suggestions and double check the travis failure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r241289363 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: To be honest, I am also a bit embarrassed for current implementation. It does exist the redundant and potential inconsistent issues as you mentioned. The first way you mentioned can solve above issues if we can check whether the selector is broadcast or not internally in `RecordWriter`. For the second way we can not only pass the boolean argument without selector, because once the boolean value is false, we should rely on the specific selector to get channels. So the selector has to be given explicitly, but the boolean can be got implicitly from the selector. I also thought of another way before, that is defining another BroadcastRecordWriter for handling directly. Then the writer is distinguished by common `RecordWriter` and special `BroadcastRecordWriter`. In common `RecordWriter`, the selector is needed for routing channels, and for `BroadcastRecordWriter` it does not need the selector at all. To do so we may need further extract the common codes for these two writers, furthermore we may need to remove current `StreamRecordWriter` which would be integrated with current `RecordWriter` directly. So the writer is divided by broadcast mode, not by stream mode. What do you think this way? I would also further consider the changes for above two options through. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11128) Methods of org.apache.flink.table.expressions.Expression are private[flink]
[ https://issues.apache.org/jira/browse/FLINK-11128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719822#comment-16719822 ] vinoyang edited comment on FLINK-11128 at 12/13/18 6:45 AM: [~dfakhritdinov] I tried it in a new project. However, it did not cause any problem. What is the phenomenon of "impossible" that you mean? was (Author: yanghua): [~dfakhritdinov] I tried it. However, it did not cause any problem. What is the phenomenon of "impossible" that you mean? > Methods of org.apache.flink.table.expressions.Expression are private[flink] > --- > > Key: FLINK-11128 > URL: https://issues.apache.org/jira/browse/FLINK-11128 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Denys Fakhritdinov >Assignee: vinoyang >Priority: Critical > > Source with push-down are part of public API: > [https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#defining-a-tablesource-with-filter-push-down] > > {{But all methods of org.apache.flink.table.expressions.Expression are > *{color:#FF}private[flink]{color}*, what makes it impossible to use.}} > > {{Please, change private[flink] to public.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11128) Methods of org.apache.flink.table.expressions.Expression are private[flink]
[ https://issues.apache.org/jira/browse/FLINK-11128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719822#comment-16719822 ] vinoyang commented on FLINK-11128: -- [~dfakhritdinov] I tried it. However, it did not cause any problem. What is the phenomenon of "impossible" that you mean? > Methods of org.apache.flink.table.expressions.Expression are private[flink] > --- > > Key: FLINK-11128 > URL: https://issues.apache.org/jira/browse/FLINK-11128 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Denys Fakhritdinov >Assignee: vinoyang >Priority: Critical > > Source with push-down are part of public API: > [https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#defining-a-tablesource-with-filter-push-down] > > {{But all methods of org.apache.flink.table.expressions.Expression are > *{color:#FF}private[flink]{color}*, what makes it impossible to use.}} > > {{Please, change private[flink] to public.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11062) web ui log error
[ https://issues.apache.org/jira/browse/FLINK-11062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lining closed FLINK-11062. -- Resolution: Fixed > web ui log error > > > Key: FLINK-11062 > URL: https://issues.apache.org/jira/browse/FLINK-11062 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.7.0 >Reporter: lining >Priority: Major > Attachments: image-2018-12-04-16-50-45-092.png, > image-2018-12-04-16-53-23-887.png > > > !image-2018-12-04-16-53-23-887.png! > > > see log will error which reason is net::ERR_CONTENT_LENGTH_MISMATCH -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on issue #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction
dianfu commented on issue #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction URL: https://github.com/apache/flink/pull/7253#issuecomment-446850114 @twalthr @sunjincheng121 Could you help to take a look at this PR? You can see from the harness test of #7286 that this change makes it very easy to write a new harness test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-9462) Disentangle flink-json and flink-table
[ https://issues.apache.org/jira/browse/FLINK-9462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-9462: -- Assignee: boshu Zheng > Disentangle flink-json and flink-table > -- > > Key: FLINK-9462 > URL: https://issues.apache.org/jira/browse/FLINK-9462 > Project: Flink > Issue Type: Improvement > Components: Build System, Table API & SQL >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: boshu Zheng >Priority: Major > Fix For: 1.8.0 > > > The {{flink-json}} module defines Json serialization and deserialization > schemas. Additionally, it defines Json table descriptor. Due to this, it has > a dependency on {{flink-table}}. We should either rename this module into > {{flink-json-table}} or move the table API specific classes into a different > module. That way we could remove the dependency on {{flink-table}} which > decouples the Json serialization and deserialization schemas from the Table > API on which the schemas should not depend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table
hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table URL: https://github.com/apache/flink/pull/6236#issuecomment-446846572 @zjffdu Thanks for the update. I will take a look within next week. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11035) Notify data available to network stack immediately after finishing BufferBuilder
[ https://issues.apache.org/jira/browse/FLINK-11035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719746#comment-16719746 ] zhijiang commented on FLINK-11035: -- [~pnowojski], thanks for your explanation! I am just not sure whether you have other thoughts for this issue before, and I am clear now. If there triggers a flush between two {{BufferBuilder}}'s add operation, then we can get benefits from current design. In other words the flush thread help us do the notification work to some extent. But the flush timeout is configured, not very determined, especially for batch job no flush mechanism. So it has two sides, and the disadvantage aspect may be just a corner case. I have not found the problems in real world yet, only when we mock to build such special scenarios. So this improvement is not critical and determined now, we can keep the current logic until we have new finds. :) > Notify data available to network stack immediately after finishing > BufferBuilder > > > Key: FLINK-11035 > URL: https://issues.apache.org/jira/browse/FLINK-11035 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The data availability notification for network relies on whether there are > finished _BufferBuilder_ or flush triggered. If flush is not triggered and > the first _BufferBuilder_ enqueues into the subpartition, although this > _BufferBuilder_ is finished on _RecordWriter_ side, it has to rely on > enqueuing the second _BufferBuilder_ to trigger notification available. It > may bring some delays for transporting the finished _BufferBuilder_ in > network, especially there has a blocking operation for requesting the second > _BufferBuilder_ from pool. > Supposing there is only one available buffer in LocalBufferPool in extreme > scenarios, if the first _BufferBuilder_ is not transported and recycled, the > requesting for second _BufferBuilder_ will be blocked all the time. > I propose to add a _notifyBufferFinished_ method in _ResultPartitionWriter_ > interface, then _RecordWriter_ can notify via it after _BufferBuilder_ > finished_._ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6329: [FLINK-9841] Web UI only show partial taskmanager log
yanghua commented on issue #6329: [FLINK-9841] Web UI only show partial taskmanager log URL: https://github.com/apache/flink/pull/6329#issuecomment-446841558 @jinglining Not currently, I encountered this problem in our production environment, and I fixed it and verified it. It has been merged into Flink 1.6.0, and our current production environment version is 1.6.0. It behaves normally in our environment. What problem have you encountered? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-11136. - Resolution: Fixed Fix Version/s: 1.7.1 1.8.0 1.6.3 Fixed for 1.8.0 with ed0aefa6775f655591b4c0fe46382446921b7155 Fixed for 1.7.1 with ff1821a6d2f8317d0c344719b14350ac362143d9 Fixed for 1.6.3 with ff9b7f1b60a4aeb1d925b236b9818002aad830de > Fix the logical of merge for DISTINCT aggregates > > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > Time Spent: 10m > Remaining Estimate: 0h > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ifndef-SleePy commented on issue #7290: [FLINK-11137] [runtime] Fix unexpected RegistrationTimeoutException of TaskExecutor
ifndef-SleePy commented on issue #7290: [FLINK-11137] [runtime] Fix unexpected RegistrationTimeoutException of TaskExecutor URL: https://github.com/apache/flink/pull/7290#issuecomment-446838962 Travis testing build failed in connectors model. I checked the failed case, it works well in my local environment. So I think that's probably an unstable case. I have updated the PR to trigger the travis integration test building again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11001) Window rowtime attribute can't be renamed in Java
[ https://issues.apache.org/jira/browse/FLINK-11001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-11001. - Resolution: Fixed Fix Version/s: 1.8.0 Fixed for 1.8.0 with ff9b7f1b60a4aeb1d925b236b9818002aad830de > Window rowtime attribute can't be renamed in Java > - > > Key: FLINK-11001 > URL: https://issues.apache.org/jira/browse/FLINK-11001 > Project: Flink > Issue Type: Bug > Environment: >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently, we can rename window rowtime attribute like this in Scala: > {code:java} > table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > {code} > However, an exception will be thrown if we use java(by changing the > Expressions to String): > {code:java} > table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select("w.rowtime as rowtime, int.count as int") > {code} > The Exception is: > {code:java} > org.apache.flink.table.api.ExpressionParserException: Could not parse > expression at column 11: `,' expected but `a' found > w.rowtime as rowtime, int.count as int > {code} > > To solve the problem, we can add rename support in {{ExpressionParser}}. > However, this may conflict with the design of source which use as before > rowtime: > {code:java} > stream.toTable( > tEnv, > ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as > a"): _*) > {code} > Personally, I think we should keep the two consistent, so the final api would > be: > {code:java} > // window case > .select("w.rowtime as rowtime, int.count as int") > // source case > stream.toTable( > tEnv, > ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): > _*) > {code} > Any suggestions would be greatly appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs
fhueske commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs URL: https://github.com/apache/flink/pull/7248#issuecomment-446837983 Consistent documentation is a good point. However, I'd rather change the other examples than indicating a limitation that does not exist. Regarding the tests, IMO the should not be considered as documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11146) Get rid of legacy codes from ClusterClient
TisonKun created FLINK-11146: Summary: Get rid of legacy codes from ClusterClient Key: FLINK-11146 URL: https://issues.apache.org/jira/browse/FLINK-11146 Project: Flink Issue Type: Sub-task Components: Client, Tests Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 As [~StephanEwen] mentioned in ML, the client needs big refactoring / cleanup. It should use a proper HTTP client library to help with future authentication mechanisms. After an investigation I notice that the valid cluster clients are only {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. With this removal we arrive a clean stage where we can think how to implement a proper HTTP client more comfortably. 1. {{StandaloneClusterClient}} is now depended on by {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) and {{FlinkClient}}(part of flink-storm which is decided to be removed FLINK-10571). Also relevant tests need to be ported(or directly removed). 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 Remove legacy flink-yarn component. 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or directly removed). 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as {{#run(JobGraph, ClassLoader)}} Besides, what is {{JobClient}} used for? I cannot find valid usages of it. cc [~mxm] [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11139) stream non window join support state ttl
[ https://issues.apache.org/jira/browse/FLINK-11139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719735#comment-16719735 ] Fabian Hueske commented on FLINK-11139: --- [~zhaoshijie] I gave you contributor permissions. > stream non window join support state ttl > > > Key: FLINK-11139 > URL: https://issues.apache.org/jira/browse/FLINK-11139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: zhaoshijie >Priority: Major > > stream non window join function use timer to delete expired data,it is ok for > small amount of data or short expiration time,but it will be OOM(too many > timer)on taskManger when there is a long expiration time and a large > amount of data。In fact, table module other state function has same problem,I > would like to contribute to fix it。 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11139) stream non window join support state ttl
[ https://issues.apache.org/jira/browse/FLINK-11139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719734#comment-16719734 ] Fabian Hueske commented on FLINK-11139: --- [~zhaoshijie], I assume you configured a [idle state retention time|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time]. How did you choose the min and max values? The difference between min and max determines how often a timer is registered. Moreover, [~hequn8128] reworked the clean up timers such that they use the timer delete feature that was added in Flink 1.6.0. The PR was merged a few days ago and should already fix the problem. > stream non window join support state ttl > > > Key: FLINK-11139 > URL: https://issues.apache.org/jira/browse/FLINK-11139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: zhaoshijie >Priority: Major > > stream non window join function use timer to delete expired data,it is ok for > small amount of data or short expiration time,but it will be OOM(too many > timer)on taskManger when there is a long expiration time and a large > amount of data。In fact, table module other state function has same problem,I > would like to contribute to fix it。 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates
dianfu commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates URL: https://github.com/apache/flink/pull/7284#issuecomment-446836298 Thanks a lot for the review and merge @fhueske ! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates
asfgit closed pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates URL: https://github.com/apache/flink/pull/7284 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala index 566e3d7cbc5..57cc815fee0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala @@ -142,6 +142,21 @@ class AggregationCodeGenerator( fields.mkString(", ") } +val parametersCodeForDistinctMerge = aggFields.map { inFields => + val fields = inFields.filter(_ > -1).zipWithIndex.map { case (f, i) => +// index to constant +if (f >= physicalInputTypes.length) { + constantFields(f - physicalInputTypes.length) +} +// index to input field +else { + s"(${CodeGenUtils.boxedTypeTermForTypeInfo(physicalInputTypes(f))}) k.getField($i)" +} + } + + fields.mkString(", ") +} + // get method signatures val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes) val constantClasses = UserDefinedFunctionUtils.typeInfoToClass(constantTypes) @@ -643,7 +658,7 @@ class AggregationCodeGenerator( | (${classOf[Row].getCanonicalName}) entry.getKey(); | Long v = (Long) entry.getValue(); | if (aDistinctAcc$i.add(k, v)) { - |${aggs(i)}.accumulate(aAcc$i, k); + |${aggs(i)}.accumulate(aAcc$i, ${parametersCodeForDistinctMerge(i)}); | } |} |a.setField($i, aDistinctAcc$i); diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 46dde8e0225..ddc2a687541 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -78,6 +78,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT c, " + " COUNT(DISTINCT b)," + + " SUM(DISTINCT b)," + " SESSION_END(rowtime, INTERVAL '0.005' SECOND) " + "FROM MyTable " + "GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c " @@ -87,9 +88,9 @@ class SqlITCase extends StreamingWithStateTestBase { env.execute() val expected = Seq( - "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till {14L} - "Hello,1,1970-01-01 00:00:00.021", // window starts at [16L] till {21L}, not merged - "Hello,3,1970-01-01 00:00:00.015"// window starts at [1L,2L], + "Hello World,1,9,1970-01-01 00:00:00.014", // window starts at [9L] till {14L} + "Hello,1,16,1970-01-01 00:00:00.021", // window starts at [16L] till {21L}, not merged + "Hello,3,6,1970-01-01 00:00:00.015"// window starts at [1L,2L], // merged with [8L,10L], by [4L], till {15L} ) assertEquals(expected.sorted, StreamITCase.testResults.sorted) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7297: [hotfix] [docs] Fix typos in Table and SQL docs
asfgit closed pull request #7297: [hotfix] [docs] Fix typos in Table and SQL docs URL: https://github.com/apache/flink/pull/7297 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 55c144a8ed2..1e7ef83fcb7 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -22,7 +22,7 @@ specific language governing permissions and limitations under the License. --> -SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program. +SQL queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SQL query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries](common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream](common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink](common.html#emit-a-table)). SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program. In order to access a table in a SQL query, it must be [registered in the TableEnvironment](common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource](common.html#register-a-tablesource), [Table](common.html#register-a-table), [DataStream, or DataSet](common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register external catalogs in a TableEnvironment](common.html#register-an-external-catalog) to specify the location of the data sources. diff --git a/docs/dev/table/streaming/dynamic_tables.md b/docs/dev/table/streaming/dynamic_tables.md index 698cd673185..3420af73840 100644 --- a/docs/dev/table/streaming/dynamic_tables.md +++ b/docs/dev/table/streaming/dynamic_tables.md @@ -53,12 +53,12 @@ The following table compares traditional relational algebra and stream processin -Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes outdated when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update materialized views and updates a materialized view as soon as its base tables are updated. +Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes outdated when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update a materialized view as soon as its base tables are updated. The connection between eager view maintenance and SQL queries on streams becomes obvious if we consider the following: - A database table is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*. -- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view's base relations. +- A materialized view is defined as a SQL query. In order to update the view, the query continuously processes the changelog streams of the view's base relations. - The materialized view is the result of the streaming SQL query. With these points in mind, we introduce following concept of *Dynamic tables* in the next section. @@ -177,7 +177,7 @@ When converting a dynamic table into a stream or writing it to an external syste -* **Upsert stream:** An upsert stream is a stream with two types
[GitHub] jinglining commented on issue #6329: [FLINK-9841] Web UI only show partial taskmanager log
jinglining commented on issue #6329: [FLINK-9841] Web UI only show partial taskmanager log URL: https://github.com/apache/flink/pull/6329#issuecomment-446836098 > @zentol yes, you are right, sorry about my expression. here we should not use try-with-resource, because the listener will close the file. And it seems try-with-resource closes faster than the complete listener. hi @yanghua do you have test case for this solution. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7282: [hotfix][docs] imporve docs of batch overview
asfgit closed pull request #7282: [hotfix][docs] imporve docs of batch overview URL: https://github.com/apache/flink/pull/7282 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md index d0043647227..b4dd1671d85 100644 --- a/docs/dev/batch/index.md +++ b/docs/dev/batch/index.md @@ -406,7 +406,8 @@ DataSet result = in.partitionByRange(0) Note: This method works only on single field keys. {% highlight java %} DataSet> in = // [...] -DataSet result = in.partitionCustom(Partitioner partitioner, key) +DataSet result = in.partitionCustom(partitioner, key) +.mapPartition(new PartitionMapper()); {% endhighlight %} @@ -709,7 +710,7 @@ val result = in.partitionByRange(0).mapPartition { ... } {% highlight scala %} val in: DataSet[(Int, String)] = // [...] val result = in - .partitionCustom(partitioner: Partitioner[K], key) + .partitionCustom(partitioner, key).mapPartition { ... } {% endhighlight %} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11146) Get rid of legacy codes from ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-11146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-11146: - Description: As [~StephanEwen] mentioned in ML, the client needs big refactoring / cleanup. It should use a proper HTTP client library to help with future authentication mechanisms. After an investigation I notice that the valid cluster clients are only {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. With this removal we arrive a clean stage where we can think how to implement a proper HTTP client more comfortably. 1. {{StandaloneClusterClient}} is now depended on by {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) and {{FlinkClient}}(part of flink-storm which is decided to be removed FLINK-10571). Also relevant tests need to be ported(or directly removed). 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 Remove legacy flink-yarn component. 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or directly removed). 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as {{#run(JobGraph, ClassLoader)}} Besides, what is {{JobClient}} used for? I cannot find valid usages of it. (Till mentioned it at ML https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E) cc [~mxm] [~till.rohrmann] was: As [~StephanEwen] mentioned in ML, the client needs big refactoring / cleanup. It should use a proper HTTP client library to help with future authentication mechanisms. After an investigation I notice that the valid cluster clients are only {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. With this removal we arrive a clean stage where we can think how to implement a proper HTTP client more comfortably. 1. {{StandaloneClusterClient}} is now depended on by {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) and {{FlinkClient}}(part of flink-storm which is decided to be removed FLINK-10571). Also relevant tests need to be ported(or directly removed). 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 Remove legacy flink-yarn component. 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or directly removed). 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as {{#run(JobGraph, ClassLoader)}} Besides, what is {{JobClient}} used for? I cannot find valid usages of it. cc [~mxm] [~till.rohrmann] > Get rid of legacy codes from ClusterClient > -- > > Key: FLINK-11146 > URL: https://issues.apache.org/jira/browse/FLINK-11146 > Project: Flink > Issue Type: Task > Components: Client, Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.8.0 > > > As [~StephanEwen] mentioned in ML, the client needs big refactoring / > cleanup. It should use a proper HTTP client library to help with future > authentication mechanisms. > After an investigation I notice that the valid cluster clients are only > {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, > {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 > codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. > With this removal we arrive a clean stage where we can think how to implement > a proper HTTP client more comfortably. > 1. {{StandaloneClusterClient}} is now depended on by > {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) > and {{FlinkClient}}(part of flink-storm which is decided to be removed > FLINK-10571). Also relevant tests need to be ported(or directly removed). > 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 > Remove legacy flink-yarn component. > 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or > directly removed). > 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as > {{#run(JobGraph, ClassLoader)}} > Besides, what is {{JobClient}} used for? I cannot find valid usages of it. > (Till mentioned it at ML > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E) > cc [~mxm] [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11146) Get rid of legacy codes from ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-11146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-11146: - Issue Type: Task (was: Sub-task) Parent: (was: FLINK-10392) > Get rid of legacy codes from ClusterClient > -- > > Key: FLINK-11146 > URL: https://issues.apache.org/jira/browse/FLINK-11146 > Project: Flink > Issue Type: Task > Components: Client, Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.8.0 > > > As [~StephanEwen] mentioned in ML, the client needs big refactoring / > cleanup. It should use a proper HTTP client library to help with future > authentication mechanisms. > After an investigation I notice that the valid cluster clients are only > {{MiniClusterClient}} and {{RestClusterClient}}. Legacy clients, > {{StandaloneClusterClient}} and {{YarnClusterClient}}, as well as pre-FLIP-6 > codes inside {{ClusterClient}}, should be removed as part of FLINK-10392. > With this removal we arrive a clean stage where we can think how to implement > a proper HTTP client more comfortably. > 1. {{StandaloneClusterClient}} is now depended on by > {{LegacyStandaloneClusterDescriptor}} (the removal is tracked by FLINK-10700) > and {{FlinkClient}}(part of flink-storm which is decided to be removed > FLINK-10571). Also relevant tests need to be ported(or directly removed). > 2. The removal of {{YarnClusterClient}} should go along with FLINK-11106 > Remove legacy flink-yarn component. > 3. Testing classes inheriting from {{ClusterClient}} need to be ported(or > directly removed). > 4. Get rid of legacy codes inside {{ClusterClient}} it self, such as > {{#run(JobGraph, ClassLoader)}} > Besides, what is {{JobClient}} used for? I cannot find valid usages of it. > cc [~mxm] [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11139) stream non window join support state ttl
[ https://issues.apache.org/jira/browse/FLINK-11139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719728#comment-16719728 ] Hequn Cheng commented on FLINK-11139: - [~zhaoshijie] Hi, thanks for reporting and trying to fix the problem. Do you have any concrete plans of how to fix it? It is suggested to share your ideas before giving a pr. :) > stream non window join support state ttl > > > Key: FLINK-11139 > URL: https://issues.apache.org/jira/browse/FLINK-11139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: zhaoshijie >Priority: Major > > stream non window join function use timer to delete expired data,it is ok for > small amount of data or short expiration time,but it will be OOM(too many > timer)on taskManger when there is a long expiration time and a large > amount of data。In fact, table module other state function has same problem,I > would like to contribute to fix it。 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java
hequn8128 commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java URL: https://github.com/apache/flink/pull/7289#issuecomment-446833240 @fhueske Thanks a lot for your review! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11138) RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the disk's overhead
[ https://issues.apache.org/jira/browse/FLINK-11138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719713#comment-16719713 ] Yun Tang commented on FLINK-11138: -- Yes, you are right. But we already disable the WAL of rocksDB. You could view configuration [here|https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L327]. > RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the > disk's overhead > -- > > Key: FLINK-11138 > URL: https://issues.apache.org/jira/browse/FLINK-11138 > Project: Flink > Issue Type: Improvement >Reporter: cailiuyang >Priority: Major > > The wal of rocksdb is used to recovery after crash, but after flink-job > crash, it always recovery from last checkpoint, so it seems wal is useless. > Am i right? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11088: -- Description: Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode [1] since proxy user or super user might not have access to actual users' keytab, but can request delegation tokens on users' behalf. Based on the type of security options for long-living YARN service[2], we propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: [1] https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html [2] https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services was: Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode since proxy user or super user does not have access to actual user's keytab but only delegation tokens. We propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: [1] https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html [2] https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode [1] since proxy user or > super user might not have access to actual users' keytab, but can request > delegation tokens on users' behalf. > Based on the type of security options for long-living YARN service[2], we > propose to have the keytab file path discovery configurable depending on the > launch mode of the YARN client. > Reference: > [1] > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html > [2] > https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9529) Do not expose keyed operations on ProcessFunction.Context
[ https://issues.apache.org/jira/browse/FLINK-9529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-9529: -- Assignee: boshu Zheng > Do not expose keyed operations on ProcessFunction.Context > - > > Key: FLINK-9529 > URL: https://issues.apache.org/jira/browse/FLINK-9529 > Project: Flink > Issue Type: Sub-task > Components: Java API >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: boshu Zheng >Priority: Major > > Currently, the {{ProcessFunction}} provides a {{Context}} object from which > on can access the {{TimerService}}. The {{TimerService}} allows to register > timers if one operates on a keyed stream. If it is not a keyed stream, then > this operation fails. Recently, we added a {{KeyedProcessFunction}} which > captures this semantic difference. I would propose to clean up the > {{Context}} interface to give only access to supported operations when > operating on a non-keyed and keyed state respectively. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11143) AskTimeoutException is thrown during job submission and completion
[ https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719694#comment-16719694 ] Qi commented on FLINK-11143: The hard code I’ve found is [1] and [2]. I think they should be changed to parameters. [1] [https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187] [2] [https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117] > AskTimeoutException is thrown during job submission and completion > -- > > Key: FLINK-11143 > URL: https://issues.apache.org/jira/browse/FLINK-11143 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.2 >Reporter: Alex Vinnik >Priority: Major > > For more details please see the thread > [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E] > On submission > 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > On completion > > {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748)\nCaused by: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms].
[jira] [Commented] (FLINK-10882) Misleading job/task state for scheduled jobs
[ https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719670#comment-16719670 ] lining commented on FLINK-10882: See code, now org.apache.flink.runtime.jobgraph.JobStatus define running which means Some tasks are scheduled or running, some may be pending, some may be finished. Should we add new status? > Misleading job/task state for scheduled jobs > > > Key: FLINK-10882 > URL: https://issues.apache.org/jira/browse/FLINK-10882 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > Attachments: list_view.png, task_view.png > > > When submitting a job when not enough resources are available currently > cuases the job stay in a {{CREATE/SCHEDULED}} state. > There are 2 issues with how this is presented in the UI. > The {{Running Jobs}} page incorrectly states that the job is running. > (see list_view attachment) > EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING > state. > The state display for individual tasks either > # States the task is in a CREATED state, when it is actually SCHEDULED > # States the task is in a CREATED state, but the count for all state boxes is > zero. > (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10882) Misleading job/task state for scheduled jobs
[ https://issues.apache.org/jira/browse/FLINK-10882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719654#comment-16719654 ] lining commented on FLINK-10882: Hi [~Zentol], I'm interested in this. Can assign this Jira to me? > Misleading job/task state for scheduled jobs > > > Key: FLINK-10882 > URL: https://issues.apache.org/jira/browse/FLINK-10882 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Priority: Major > Attachments: list_view.png, task_view.png > > > When submitting a job when not enough resources are available currently > cuases the job stay in a {{CREATE/SCHEDULED}} state. > There are 2 issues with how this is presented in the UI. > The {{Running Jobs}} page incorrectly states that the job is running. > (see list_view attachment) > EDIT: Actually, from a runtime perspective the job is in fact in a RUNNING > state. > The state display for individual tasks either > # States the task is in a CREATED state, when it is actually SCHEDULED > # States the task is in a CREATED state, but the count for all state boxes is > zero. > (see task_view attachment) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719651#comment-16719651 ] Sergei Poganshev commented on FLINK-11127: -- Here's a (slightly tricky) workaround that works: * configure *metrics.internal.query-service.port* property to some fixed port (e.g. **) * in taskmanager deployment: ** expose that fixed port in taskmanager deployment spec ** use init container to put taskmanager's pod ip to flink configuration file: *** define a shared _emptyDir_ volume for *both* flink container and init container to use that flink container will mount to /etc/flink *** init container: add environment variable to init container definition that references *status.podIP* mount flink configuration folder to init container (anywhere other than /etc/flink) - this will be used as a basis for changes copy files from this folder to /etc/flink make init container command append pod ip to flink-conf.yaml as a value for *taskmanager.host* property *** since /etc/flink is mounted to both containers, changes in init container will be visible in flink container Configured this way jobmanager will connect to taskmanagers IPs directly to a configured fixed port. > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Kubernetes, Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the TMs register. > I tagged this ticket as an improvement, but in the context of Kubernetes I > would consider this to be a bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-7293. -- > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java
fhueske commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java URL: https://github.com/apache/flink/pull/7289#issuecomment-446810382 Thanks for the fix @hequn8128! I'll merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua edited a comment on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs
yanghua edited a comment on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs URL: https://github.com/apache/flink/pull/7248#issuecomment-446808073 @fhueske you are right. But there may be some points that can be misleading for some users: * Sometimes fields of the same name are more easily visually identified as having the same type; * Multiple test methods in `SetOperatorsITCase` explicitly specify the same fields (a, b, c) for the left and right Tables, because the document is just an incomplete example, sometimes after the user views the document, they will try to search the source code and see real cases in the project; * Other operators of multi-table operations in the same document such as `union`, `minus` also use the same field name This is why I said LGTM. But in fact, for those who understand the use of this operator, it does not matter whether it is modified or not. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs
yanghua commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs URL: https://github.com/apache/flink/pull/7248#issuecomment-446808073 @fhueske you are right. But there may be some points that can be misleading for some users: * Sometimes fields of the same name are more easily visually identified as having the same type; * Multiple test methods in `SetOperatorsITCase` explicitly specify the same fields (a, b, c) for the left and right Tables, because the document is just an incomplete example, sometimes after the user views the document, they will try to go to the project. Search for source code and see real cases; * Other operators of multi-table operations in the same document such as `union`, `minus` also use the same field name This is why I said LGTM. But in fact, for those who understand the use of this operator, it does not matter whether it is modified or not. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-11136: -- Issue Type: Bug (was: Test) > Fix the logical of merge for DISTINCT aggregates > > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates
fhueske commented on issue #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates URL: https://github.com/apache/flink/pull/7284#issuecomment-446804905 Thanks for fixing this bug @dianfu! Will merge this fix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-9232) Add harness test for AggregationCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong closed FLINK-9232. Resolution: Duplicate > Add harness test for AggregationCodeGenerator > -- > > Key: FLINK-9232 > URL: https://issues.apache.org/jira/browse/FLINK-9232 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Instead of relying on ITCase to cover the codegen result. We should have > direct test against that, for example using Harness test framework. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #7282: [hotfix][docs] imporve docs of batch overview
fhueske commented on issue #7282: [hotfix][docs] imporve docs of batch overview URL: https://github.com/apache/flink/pull/7282#issuecomment-446795624 Thanks for the PR @KarmaGYZ. I'll merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #7297: [hotfix] [docs] Fix typos in Table and SQL docs
fhueske commented on issue #7297: [hotfix] [docs] Fix typos in Table and SQL docs URL: https://github.com/apache/flink/pull/7297#issuecomment-446791047 Thanks for the PR @afedulov. I'll merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs
fhueske commented on issue #7248: [hotfix][tableApi] Fix the description information of intersect in tableApi docs URL: https://github.com/apache/flink/pull/7248#issuecomment-446785876 Hi @maqingxiang, thanks for this PR. The current documentation is correct. For `intersect` / `intersectAll`, both input tables only need to have the same field types but the field names can differ. I'd like to keep the docs as they are to make this behavior clear. Would you please close this PR? Thank you, Fabian. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher
zentol commented on a change in pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher URL: https://github.com/apache/flink/pull/7295#discussion_r241158558 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java ## @@ -52,25 +46,6 @@ public static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET; - public static CompletableFuture> getRedirectAddress( - String localRestAddress, - RestfulGateway restfulGateway, - Time timeout) { - - return restfulGateway.requestRestAddress(timeout).thenApply( - (String remoteRestAddress) -> { - if (Objects.equals(localRestAddress, remoteRestAddress)) { - return Optional.empty(); - } else { - return Optional.of(remoteRestAddress); - } - }); - } - - public static HttpResponse getRedirectResponse(String redirectAddress, String path) { - return getRedirectResponse(redirectAddress, path, HttpResponseStatus.TEMPORARY_REDIRECT); - } - public static HttpResponse getRedirectResponse(String redirectAddress, String path, HttpResponseStatus code) { Review comment: I supposed the remaining ones are used for the SSL redirection? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher
zentol commented on a change in pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher URL: https://github.com/apache/flink/pull/7295#discussion_r241160578 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/LeaderRetrievalHandlerTest.java ## @@ -101,31 +87,14 @@ public void testRedirectHandler() throws Exception { configuration); try (HttpTestClient httpClient = new HttpTestClient("localhost", bootstrap.getServerPort())) { - // 1. without completed local address future --> Internal server error + // 1. no leader gateway available --> Service unavailable httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout)); HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout)); - Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus()); - - // 2. with completed local address future but no leader gateway available --> Service unavailable - localAddressFuture.complete(correctAddress); - - httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout)); - - response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout)); - Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus()); - // 3. with leader gateway which is not the one of this REST endpoints --> Redirection required - httpClient.sendGetRequest(restPath, FutureUtils.toFiniteDuration(timeout)); - - response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout)); - - Assert.assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, response.getStatus()); - Assert.assertEquals(expectedRedirection, response.getLocation()); - - // 4. with local REST endpoint + // 2. with leader Review comment: help me out here, why is the leader now suddenly available? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] afedulov opened a new pull request #7297: [hotfix] [docs] Fix typos in Table and SQL docs
afedulov opened a new pull request #7297: [hotfix] [docs] Fix typos in Table and SQL docs URL: https://github.com/apache/flink/pull/7297 ## What is the purpose of the change Fix typos in Table and SQL docs. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs
fhueske commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#issuecomment-446699926 There was a discussion about a 1.5.6 on the dev ML (started by Till) a few days ago and it was concluded that there will be another release. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11145) Fix Hadoop version handling in binary release script
Thomas Weise created FLINK-11145: Summary: Fix Hadoop version handling in binary release script Key: FLINK-11145 URL: https://issues.apache.org/jira/browse/FLINK-11145 Project: Flink Issue Type: Improvement Components: Build System Reporter: Thomas Weise Assignee: Thomas Weise It is currently not possible to build the package for a single Hadoop version only (even though the logic exists in the script). Also the Hadoop versions we build for are repeated and inconsistent. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on issue #7278: [FLINK-11145] Allow for override of HADOOP_VERSION for single variant binary release build
tweise commented on issue #7278: [FLINK-11145] Allow for override of HADOOP_VERSION for single variant binary release build URL: https://github.com/apache/flink/pull/7278#issuecomment-446677083 @zentol PTAL - with fix for the version duplication This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken commented on issue #7279: [hotfix] add hadoop user name to yarn logs command
lamber-ken commented on issue #7279: [hotfix] add hadoop user name to yarn logs command URL: https://github.com/apache/flink/pull/7279#issuecomment-446672016 hi, @zentol @tillrohrmann, cc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken opened a new pull request #7296: [hotfix][web] fix the desc about restart strategy in web
lamber-ken opened a new pull request #7296: [hotfix][web] fix the desc about restart strategy in web URL: https://github.com/apache/flink/pull/7296 ## What is the purpose of the change int flink job configuration page, it's better use `Restart strategy`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7293: [FLINK-11144][tests] Make tests run on Java 9
zentol commented on a change in pull request #7293: [FLINK-11144][tests] Make tests run on Java 9 URL: https://github.com/apache/flink/pull/7293#discussion_r241106082 ## File path: pom.xml ## @@ -1602,7 +1616,7 @@ under the License. org.apache.maven.plugins maven-shade-plugin - 3.0.0 + 3.1.1 Review comment: please move this into the jdk9 profile; I don't have the time right now to verify that the shading still works correctly for all modules. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs
mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#issuecomment-44924 Btw, I also want to backport this to 1.5 and 1.6. Do you know if there is another 1.5 release planned? Or is 1.5 already end of life with 1.7 being out? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11127) Make metrics query service establish connection to JobManager
[ https://issues.apache.org/jira/browse/FLINK-11127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719153#comment-16719153 ] Sergei Poganshev commented on FLINK-11127: -- [~uce] Ah. Thanks for the heads up. We'll probably use StatfulSet just for development purposes for now (on a small number of nodes), until the problem is fixed. > Make metrics query service establish connection to JobManager > - > > Key: FLINK-11127 > URL: https://issues.apache.org/jira/browse/FLINK-11127 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Kubernetes, Metrics >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > > As part of FLINK-10247, the internal metrics query service has been separated > into its own actor system. Before this change, the JobManager (JM) queried > TaskManager (TM) metrics via the TM actor. Now, the JM needs to establish a > separate connection to the TM metrics query service actor. > In the context of Kubernetes, this is problematic as the JM will typically > *not* be able to resolve the TMs by name, resulting in warnings as follows: > {code} > 2018-12-11 08:32:33,962 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183] has > failed, address is now gated for [50] ms. Reason: [Association failed with > [akka.tcp://flink-metrics@flink-task-manager-64b868487c-x9l4b:39183]] Caused > by: [flink-task-manager-64b868487c-x9l4b: Name does not resolve] > {code} > In order to expose the TMs by name in Kubernetes, users require a service > *for each* TM instance which is not practical. > This currently results in the web UI not being to display some basic metrics > about number of sent records. You can reproduce this by following the READMEs > in {{flink-container/kubernetes}}. > This worked before, because the JM is typically exposed via a service with a > known name and the TMs establish the connection to it which the metrics query > service piggybacked on. > A potential solution to this might be to let the query service connect to the > JM similar to how the TMs register. > I tagged this ticket as an improvement, but in the context of Kubernetes I > would consider this to be a bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10748) Jobmanager in HA setup, redirects (307) don't work when behind a load balancer
[ https://issues.apache.org/jira/browse/FLINK-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10748: --- Labels: pull-request-available (was: ) > Jobmanager in HA setup, redirects (307) don't work when behind a load balancer > -- > > Key: FLINK-10748 > URL: https://issues.apache.org/jira/browse/FLINK-10748 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Jeroen Steggink >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > > In a HA Jobmanager setup, connecting to a follower results in a redirect > (HTTP/1.1 307 Temporary Redirect) to the leader. However, it redirects to an > ip address (why is it not the hostname?) which is in another network and not > reachable. I have configured hostnames, not ip addresses. > Wouldn't it be better to proxy the request to another jobmanager instead of > using a redirect? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher
tillrohrmann opened a new pull request #7295: [FLINK-10748] Decouple RestServerEndpoint from Dispatcher URL: https://github.com/apache/flink/pull/7295 ## What is the purpose of the change This commits decouples the RestServerEndpoint from the Dispatcher by converting the RedirectHandler into LeaderRetrievalHandler which only retrieves the gateway of the current leader instead of redirecting the client. This is possible since the serving RestServerEndpoint no longer needs to be colocated with the Dispatcher and JobMaster after all RPC requests are serializable (e.g. no direct access on the ExecutionGraph is needed). With this change every RestServerEndpoint will automatically proxy to the current leader without needing to redirect the requesting client. ## Brief change log - Remove redirection logic from `RedirectHandler` - Rename `RedirectHandler` into `LeaderRetrievalHandler` - Remove `restAddressFuture` from all REST handlers except for the `JarListHandler` ## Verifying this change - Tested manually - Adapted former `RedirectHandlerTest` which is now called `LeaderRetrievalHandlerTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446645970 @zentol updated this PR This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r241075877 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: hmm, yes, the generic Exception cannot provide more detailed information. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r241074645 ## File path: flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java ## @@ -37,6 +37,17 @@ .withDeprecatedKeys(WebOptions.ADDRESS.key(), ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS.key()) .withDescription("The address that the server binds itself."); + /** +* The port range that the server could bind itself to. +*/ + public static final ConfigOption BIND_PORT = + key("rest.bind-port") Review comment: You are right. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10748) Jobmanager in HA setup, redirects (307) don't work when behind a load balancer
[ https://issues.apache.org/jira/browse/FLINK-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-10748: - Assignee: Till Rohrmann > Jobmanager in HA setup, redirects (307) don't work when behind a load balancer > -- > > Key: FLINK-10748 > URL: https://issues.apache.org/jira/browse/FLINK-10748 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Jeroen Steggink >Assignee: Till Rohrmann >Priority: Critical > > In a HA Jobmanager setup, connecting to a follower results in a redirect > (HTTP/1.1 307 Temporary Redirect) to the leader. However, it redirects to an > ip address (why is it not the hostname?) which is in another network and not > reachable. I have configured hostnames, not ip addresses. > Wouldn't it be better to proxy the request to another jobmanager instead of > using a redirect? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10748) Jobmanager in HA setup, redirects (307) don't work when behind a load balancer
[ https://issues.apache.org/jira/browse/FLINK-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719118#comment-16719118 ] Till Rohrmann commented on FLINK-10748: --- Thanks for reporting this issue [~jeroens]. I think you are right that the current setup is not ideal. I think by now, the {{RestServerEndpoint}} no longer needs to be colocated with the {{Dispatcher}} and the {{JobMaster}} as it was the case before. Thus, it should be possible to simply use the retrieved leader gateway in the {{RedirectHandler}} and give it to the requested handler (also if this means that the communication with the {{Dispatcher}} is a remote call) instead of redirecting the client to another {{RestServerEndpoint}}. > Jobmanager in HA setup, redirects (307) don't work when behind a load balancer > -- > > Key: FLINK-10748 > URL: https://issues.apache.org/jira/browse/FLINK-10748 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Jeroen Steggink >Priority: Critical > > In a HA Jobmanager setup, connecting to a follower results in a redirect > (HTTP/1.1 307 Temporary Redirect) to the leader. However, it redirects to an > ip address (why is it not the hostname?) which is in another network and not > reachable. I have configured hostnames, not ip addresses. > Wouldn't it be better to proxy the request to another jobmanager instead of > using a redirect? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11067) Port TableEnvironments to Java
[ https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719091#comment-16719091 ] Dawid Wysakowicz edited comment on FLINK-11067 at 12/12/18 3:29 PM: I think as user should only import either {{flink-tabe-api-java}} or {{flink-tabe-api-scala}} never both of them we can have: {code} // in flink-table-api-java module package org.apache.flink.table.api; public final class TableEnvironment { static org.apache.flink.table.api.java.BatchTableEnvironment getTableEnvironment(BatchEnvironment); static org.apache.flink.table.api.java.StreamTableEnvironment getTableEnvironment(StreamEnvironment); private TableEnvironment() { } } // in flink-table-api-scala module package org.apache.flink.table.api; object TableEnvironment { def getTableEnvironment(BatchEnvironment): org.apache.flink.table.api.scala.BatchTableEnvironment = ... def getTableEnvironment(StreamEnvironment): org.apache.flink.table.api.scala.StreamTableEnvironment = } {code} The rest would remain the same as in [~twalthr] suggestion. I think that would retain backwards compatibility. was (Author: dawidwys): I think as user should only import either {{flink-tabe-api-java}} or {{flink-tabe-api-scala}} never both of them. I think we can have: {code} // in flink-table-api-java module package org.apache.flink.table.api; public final class TableEnvironment { static org.apache.flink.table.api.java.BatchTableEnvironment getTableEnvironment(BatchEnvironment); static org.apache.flink.table.api.java.StreamTableEnvironment getTableEnvironment(StreamEnvironment); private TableEnvironment() { } } // in flink-table-api-scala module package org.apache.flink.table.api; object TableEnvironment { def getTableEnvironment(BatchEnvironment): org.apache.flink.table.api.scala.BatchTableEnvironment = ... def getTableEnvironment(StreamEnvironment): org.apache.flink.table.api.scala.StreamTableEnvironment = } {code} The rest would remain the same as in [~twalthr] suggestion. I think that would retain backwards compatibility. > Port TableEnvironments to Java > -- > > Key: FLINK-11067 > URL: https://issues.apache.org/jira/browse/FLINK-11067 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Major > > This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, > {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided > and discussed. Some refactoring and clean up might be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java
[ https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719091#comment-16719091 ] Dawid Wysakowicz commented on FLINK-11067: -- I think as user should only import either {{flink-tabe-api-java}} or {{flink-tabe-api-scala}} never both of them. I think we can have: {code} // in flink-table-api-java module package org.apache.flink.table.api; public final class TableEnvironment { static org.apache.flink.table.api.java.BatchTableEnvironment getTableEnvironment(BatchEnvironment); static org.apache.flink.table.api.java.StreamTableEnvironment getTableEnvironment(StreamEnvironment); private TableEnvironment() { } } // in flink-table-api-scala module package org.apache.flink.table.api; object TableEnvironment { def getTableEnvironment(BatchEnvironment): org.apache.flink.table.api.scala.BatchTableEnvironment = ... def getTableEnvironment(StreamEnvironment): org.apache.flink.table.api.scala.StreamTableEnvironment = } {code} The rest would remain the same as in [~twalthr] suggestion. I think that would retain backwards compatibility. > Port TableEnvironments to Java > -- > > Key: FLINK-11067 > URL: https://issues.apache.org/jira/browse/FLINK-11067 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Major > > This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, > {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided > and discussed. Some refactoring and clean up might be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials
[ https://issues.apache.org/jira/browse/FLINK-11126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11126: --- Labels: pull-request-available (was: ) > Filter out AMRMToken in the TaskManager credentials > --- > > Key: FLINK-11126 > URL: https://issues.apache.org/jira/browse/FLINK-11126 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Affects Versions: 1.6.2, 1.7.0 >Reporter: Paul Lin >Assignee: Paul Lin >Priority: Minor > Labels: pull-request-available > > Currently, Flink JobManager propagates its storage tokens to TaskManager to > meet the requirement of YARN log aggregation (see FLINK-6376). But in this > way the AMRMToken is also included in the TaskManager credentials, which > could be potentially insecure. We should filter out AMRMToken before setting > the tokens to TaskManager's container launch context. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] link3280 opened a new pull request #7294: [FLINK-11126][YARN][security] Filter out AMRMToken in the TaskManager credentials
link3280 opened a new pull request #7294: [FLINK-11126][YARN][security] Filter out AMRMToken in the TaskManager credentials URL: https://github.com/apache/flink/pull/7294 ## What is the purpose of the change Currently, Flink JobManager propagates its storage tokens to TaskManager to meet the requirement of YARN log aggregation (see FLINK-6376). But in this way the AMRMToken is also included in the TaskManager credentials, which could be potentially insecure. The PR filters out AMRMToken before setting the tokens to TaskManager's container launch context, and adds checks for delegation tokens that JobManager prepares for TaskManagers. ## Brief change log - *Filter out AMRMToken before setting the tokens to the TaskManager container context.* ## Verifying this change This change added tests and can be verified as follows: - *Extended YarnApplicationMasterRunnerTest to check delegation tokens in the TaskManager executor context.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11144) Run Tests on Java 9
[ https://issues.apache.org/jira/browse/FLINK-11144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-11144: - Description: When using Java 9, tests should run when invoking {{mvn clean install -Dfast}} Acceptance criteria: * Project compiles with {{mvn clean install -Dfast -DskipTests}} * Tests are runnable was:When using Java 9, tests should run when invoking {{mvn clean install -Dfast}} > Run Tests on Java 9 > --- > > Key: FLINK-11144 > URL: https://issues.apache.org/jira/browse/FLINK-11144 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When using Java 9, tests should run when invoking {{mvn clean install -Dfast}} > Acceptance criteria: > * Project compiles with {{mvn clean install -Dfast -DskipTests}} > * Tests are runnable -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11144) Run Tests on Java 9
[ https://issues.apache.org/jira/browse/FLINK-11144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11144: --- Labels: pull-request-available (was: ) > Run Tests on Java 9 > --- > > Key: FLINK-11144 > URL: https://issues.apache.org/jira/browse/FLINK-11144 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.8.0 > > > When using Java 9, tests should run when invoking {{mvn clean install -Dfast}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL opened a new pull request #7293: [FLINK-11144][tests] Make tests run on Java 9
GJL opened a new pull request #7293: [FLINK-11144][tests] Make tests run on Java 9 URL: https://github.com/apache/flink/pull/7293 ## What is the purpose of the change *Make tests run on Java 9 by upgrading plugins to minimum supported version for Java 9: https://cwiki.apache.org/confluence/display/MAVEN/Java+9+-+Jigsaw* cc: @zentol ## Brief change log - *Upgrade surefire plugin.* - *Upgrade shade plugin.* - *Upgrade compiler plugin.* ## Verifying this change This change is already covered by existing tests, such as *all tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-11085) NoClassDefFoundError in presto-s3 filesystem
[ https://issues.apache.org/jira/browse/FLINK-11085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11085. --- Resolution: Fixed Fixed via 1.8.0: https://github.com/apache/flink/commit/ea8373e05048ad9dc2a42a2f8d10a9049559e975 1.7.1: https://github.com/apache/flink/commit/4ab682bd76ecaa5fef67b077880665b5d86b03c2 > NoClassDefFoundError in presto-s3 filesystem > > > Key: FLINK-11085 > URL: https://issues.apache.org/jira/browse/FLINK-11085 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > Attachments: image-2018-12-07-13-04-06-563.png > > Time Spent: 10m > Remaining Estimate: 0h > > A user has reporter an issue on the ML where using the presto-s3 filesystem > fails with an exception due to a missing class. The missing class is indeed > filtered out in the shade-plugin configuration. > {code:java} > java.lang.NoClassDefFoundError: > org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446) > at > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #7261: [FLINK-11085][s3] Fix inclusion of presto hadoop classes
tillrohrmann closed pull request #7261: [FLINK-11085][s3] Fix inclusion of presto hadoop classes URL: https://github.com/apache/flink/pull/7261 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index 7d19fac97e3..9a5d46e22aa 100644 --- a/pom.xml +++ b/pom.xml @@ -1478,7 +1478,8 @@ under the License. false true ${project.basedir}/target/dependency-reduced-pom.xml - + + * This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11028) Disable deployment of flink-fs-tests
[ https://issues.apache.org/jira/browse/FLINK-11028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719029#comment-16719029 ] Chesnay Schepler commented on FLINK-11028: -- eh maybe not :D the tests required flink-streaming-java, which requires flink-runtime, which requires flink-hadoop-fs for testing, resulting in a cyclic dependency. Skipping the deployment can be our last-minute backup plan if no one picks this issue up until the next feature freeze. > Disable deployment of flink-fs-tests > > > Key: FLINK-11028 > URL: https://issues.apache.org/jira/browse/FLINK-11028 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.8.0 > > > The {{flink-fs-tests}} module only contains tests, without any production > code or test utilities. As such there should never be a use-case where one > would specify a dependency on this module, hence we could skip the deployment > entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6880: [FLINK-10571][storm] Remove topology support
zentol commented on issue #6880: [FLINK-10571][storm] Remove topology support URL: https://github.com/apache/flink/pull/6880#issuecomment-446609832 @TisonKun Someone has to review the PR to check that only topology-related parts have been removed, without impacting the bolt/spout wrapper functionality. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r241037567 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -283,6 +275,14 @@ public JobExecutionResult execute(String jobName) throws ProgramInvocationExcept return executeRemotely(streamGraph, jarFiles); } + /** +* Execute the job with savepoint restore. +*/ + public JobExecutionResult execute(String jobName, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException { + this.savepointRestoreSettings = savepointRestoreSettings; + return execute(jobName); Review comment: I don't want to repeat what is already done in `execute` and `executeRemotely`. The member variable is an implementation detail that does not concern the interface contract and, as mentioned, all other execution parameters are already derived from the instance. (BTW execute is a one-time operation with side effect - it clears the transformations.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11028) Disable deployment of flink-fs-tests
[ https://issues.apache.org/jira/browse/FLINK-11028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719024#comment-16719024 ] Chesnay Schepler commented on FLINK-11028: -- The contained test are exclusively for HDFS, so this might be as easy as copying them to {{flink-hadoop-fs}} and adding a few test dependencies. Running this approach on travis: https://travis-ci.org/zentol/flink/builds/467022202 > Disable deployment of flink-fs-tests > > > Key: FLINK-11028 > URL: https://issues.apache.org/jira/browse/FLINK-11028 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.8.0 > > > The {{flink-fs-tests}} module only contains tests, without any production > code or test utilities. As such there should never be a use-case where one > would specify a dependency on this module, hence we could skip the deployment > entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11101) Ineffective openjdk exclusion Presto S3 FileSystem module
[ https://issues.apache.org/jira/browse/FLINK-11101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11101: --- Labels: pull-request-available (was: ) > Ineffective openjdk exclusion Presto S3 FileSystem module > - > > Key: FLINK-11101 > URL: https://issues.apache.org/jira/browse/FLINK-11101 > Project: Flink > Issue Type: Bug > Components: Build System, FileSystem >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > The {{presto-s3-fs}} module defines he following exclusion in the > shade-plugin: > {code:java} > >org.openjdk.jol > {code} > This exclusion has no effect on the resulting artifact. We could think about > removing it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol opened a new pull request #7292: [FLINK-11101][S3] Ban openjdk.jol dependencies
zentol opened a new pull request #7292: [FLINK-11101][S3] Ban openjdk.jol dependencies URL: https://github.com/apache/flink/pull/7292 ## What is the purpose of the change This PR removes a redundant exclusion for `openjdk.jol` from the presto-s3-filesystem pom, and adds a safeguard via the `maven-enforcer-plugin` to prevent this dependency from showing up in the dependency tree again. This guarantees that the dependency section properly excludes this dependency. This is a better approach than relying on the shade-plugin configuration since the dependency could still show up in the dependency tree of the project (if the dependency section does note exclude it correctly), which could ring alarm-bells during release testing for no reason. ## Verifying this change You can manually verify the effectiveness of the enforcer by removing the `openjdk.jol` exclusion from `presto-hive` and running `mvn-validate`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services