[GitHub] [flink] flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C
flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C URL: https://github.com/apache/flink/pull/10010#issuecomment-546787757 ## CI report: * 4b0f9efdc169d3b6375d469cd3512e87e98f0f19 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133758455) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339408894 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } + + final List exclusiveSegments = globalPool.requestMemorySegments(); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(0, globalPool.getNumberOfAvailableMemorySegments()); + + final AtomicBoolean completeFlag = new AtomicBoolean(false); + CompletableFuture availableFuture = globalPool.isAvailable(); + availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); + + // recycle one buffer + globalPool.recycle(segments.get(0)); + assertTrue(completeFlag.get()); + assertTrue(availableFuture.isDone()); + assertTrue(globalPool.isAvailable().isDone()); + assertEquals(1, globalPool.getNumberOfAvailableMemorySegments()); + + CheckedThread asyncRequest = new CheckedThread() { + @Override + public void go() throws Exception { + exclusiveSegments.addAll(globalPool.requestMemorySegments()); + } + }; + asyncRequest.start(); + + // wait until no buffer is available + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10L)); + while (globalPool.getNumberOfAvailableMemorySegments() > 0) { + Thread.sleep(50); + if (!deadline.hasTimeLeft()) { + fail("Waiting timeout."); + } + } + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + for (int i = 1; i < numberOfSegmentsToRequest; ++i) { + globalPool.recycle(segments.get(i)); + } + segments.clear(); + + asyncRequest.sync(); + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(numBuffers, exclusiveSegments.size()); + assertFalse(globalPool.isAvailable().isDone()); + + globalPool.recycleMemorySegments(exclusiveSegments); + exclusiveSegments.clear(); + assertTrue(globalPool.isAvailable().isDone()); + assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments()); Review comment: ``` globalPool.recycleMemorySegments(exclusiveSegments); exclusiveSegments.clear(); assertTrue(globalPool.isAvailable().isDone()); assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments()); ``` This part is only for resource recycle at last, so we can remove the last
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339407704 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +import java.util.Collections + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + * + * The reasons why the limit still be retained: + * 1.If the source is required to return the exact number of limit number, the implementation + * of the source is highly required. The source is required to accurately control the record + * number of split, and the parallelism setting also need to be adjusted accordingly. + * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed + * down. The source need know it should do limit first and do the filter later, it is hard to + * implement. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && +fetch != null + +var supportPushDown = false +if (onlyLimit) { + supportPushDown = call.rel(1).asInstanceOf[TableScan] + .getTable.unwrap(classOf[TableSourceTable[_]]) match { +case table: TableSourceTable[_] => + table.tableSource match { +case source: LimitableTableSource[_] => !source.isLimitPushedDown +case _ => false + } +case _ => false + } +} +supportPushDown + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val sort = call.rel(0).asInstanceOf[Sort] +val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] +val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable] +val limit = RexLiteral.intValue(sort.fetch) +val relBuilder = call.builder() +val newRelOptTable = applyLimit(limit, relOptTable, relBuilder) +val newScan = scan.copy(scan.getTraitSet, newRelOptTable) + +val newTableSource = newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource +val oldTableSource = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource + +if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown +&& newTableSource.explainSource().equals(oldTableSource.explainSource)) { + throw new TableException("Failed to push limit into table source! " Review comment: Related PR: https://github.com/apache/flink/pull/8468 It can be done something in `TableSource.explainSource`, it is default method and can have some logical. But I think this check could exist.
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339408191 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } + + final List exclusiveSegments = globalPool.requestMemorySegments(); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(0, globalPool.getNumberOfAvailableMemorySegments()); + + final AtomicBoolean completeFlag = new AtomicBoolean(false); + CompletableFuture availableFuture = globalPool.isAvailable(); + availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); + + // recycle one buffer + globalPool.recycle(segments.get(0)); + assertTrue(completeFlag.get()); + assertTrue(availableFuture.isDone()); + assertTrue(globalPool.isAvailable().isDone()); + assertEquals(1, globalPool.getNumberOfAvailableMemorySegments()); + + CheckedThread asyncRequest = new CheckedThread() { + @Override + public void go() throws Exception { + exclusiveSegments.addAll(globalPool.requestMemorySegments()); + } + }; + asyncRequest.start(); + + // wait until no buffer is available + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10L)); + while (globalPool.getNumberOfAvailableMemorySegments() > 0) { + Thread.sleep(50); + if (!deadline.hasTimeLeft()) { + fail("Waiting timeout."); + } + } + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + for (int i = 1; i < numberOfSegmentsToRequest; ++i) { + globalPool.recycle(segments.get(i)); + } + segments.clear(); + + asyncRequest.sync(); + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(numBuffers, exclusiveSegments.size()); Review comment: It is not very strict to use `numBuffers` here. Actually it should be `2 * numberOfSegmentsToRequest` and it is coincidental that `numBuffers = 2 * numberOfSegmentsToRequest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339408562 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +import java.util.Collections + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + * + * The reasons why the limit still be retained: + * 1.If the source is required to return the exact number of limit number, the implementation + * of the source is highly required. The source is required to accurately control the record + * number of split, and the parallelism setting also need to be adjusted accordingly. + * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed + * down. The source need know it should do limit first and do the filter later, it is hard to + * implement. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && Review comment: It can be, I think it is the third reason to keep the origin limit relNode. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
[ https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960766#comment-16960766 ] Jiayi Liao commented on FLINK-14498: [~kevin.cyj] Thanks for you explaination :). > Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool > -- > > Key: FLINK-14498 > URL: https://issues.apache.org/jira/browse/FLINK-14498 > Project: Flink > Issue Type: Task > Components: Runtime / Network >Reporter: zhijiang >Assignee: Yingjie Cao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > If the LocalBufferPool can not request available buffer from > NetworkBufferPool, it would wait for 2 seconds before trying to request again > in a loop way. Therefore it would bring some delays in practice. > To improve this interaction, we could introduce NetworkBufferPool#isAvailable > to return a future which would be monitored by LocalBufferPool. Then once > there are available buffers in NetworkBufferPool, it would complete this > future to notify LocalBufferPool immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
[ https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960764#comment-16960764 ] Yingjie Cao commented on FLINK-14498: - [~wind_ljy] The PR is based on another PR [https://github.com/apache/flink/pull/9905] which has not been merged yet. Only the last commit is relevant to the this jira. I will do rebase once the dependent PR is merged. > Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool > -- > > Key: FLINK-14498 > URL: https://issues.apache.org/jira/browse/FLINK-14498 > Project: Flink > Issue Type: Task > Components: Runtime / Network >Reporter: zhijiang >Assignee: Yingjie Cao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > If the LocalBufferPool can not request available buffer from > NetworkBufferPool, it would wait for 2 seconds before trying to request again > in a loop way. Therefore it would bring some delays in practice. > To improve this interaction, we could introduce NetworkBufferPool#isAvailable > to return a future which would be monitored by LocalBufferPool. Then once > there are available buffers in NetworkBufferPool, it would complete this > future to notify LocalBufferPool immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW merged pull request #9905: [FLINK-14396][network] Implement rudimentary non-blocking network output
zhijiangW merged pull request #9905: [FLINK-14396][network] Implement rudimentary non-blocking network output URL: https://github.com/apache/flink/pull/9905 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339407219 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.io.CollectionInputFormat +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.sources._ +import org.apache.flink.types.Row + +import scala.collection.JavaConverters._ + +/** + * The table source which support push-down the limit to the source. + */ +class TestLimitableTableSource( +data: Seq[Row], +rowType: RowTypeInfo, +var limit: Long = -1, +var limitablePushedDown: Boolean = false) + extends StreamTableSource[Row] + with LimitableTableSource[Row] { + + override def isBounded = true + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +val dataSet = if (limit > 0) { + data.take(limit.toInt).asJava +} else { + data.asJava +} +execEnv.createInput( + new CollectionInputFormat(dataSet, rowType.createSerializer(new ExecutionConfig)), + rowType) + } + + override def applyLimit(limit: Long): TableSource[Row] = { +new TestLimitableTableSource(data, rowType, limit, limitablePushedDown) + } + + override def isLimitPushedDown: Boolean = limitablePushedDown + + override def getReturnType: TypeInformation[Row] = rowType + + override def explainSource(): String = { +if (limit > 0 && limit < Long.MaxValue) { + "limit: " + limit +} else if (limitablePushedDown) { + "limitablePushedDown" Review comment: Yes, I will add: ``` if (limit == 0) { throw new RuntimeException("limit 0 should be optimize to single values.") } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339407217 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + * + * After pushing down, source only needs to try its best to limit the number of output records, + * but does not need to guarantee that the number must be less than or equal to the limit. Review comment: OK, that meakes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339407007 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } + + final List exclusiveSegments = globalPool.requestMemorySegments(); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(0, globalPool.getNumberOfAvailableMemorySegments()); + + final AtomicBoolean completeFlag = new AtomicBoolean(false); + CompletableFuture availableFuture = globalPool.isAvailable(); + availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); + + // recycle one buffer + globalPool.recycle(segments.get(0)); + assertTrue(completeFlag.get()); + assertTrue(availableFuture.isDone()); + assertTrue(globalPool.isAvailable().isDone()); + assertEquals(1, globalPool.getNumberOfAvailableMemorySegments()); + + CheckedThread asyncRequest = new CheckedThread() { + @Override + public void go() throws Exception { + exclusiveSegments.addAll(globalPool.requestMemorySegments()); + } + }; + asyncRequest.start(); + + // wait until no buffer is available + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10L)); + while (globalPool.getNumberOfAvailableMemorySegments() > 0) { + Thread.sleep(50); + if (!deadline.hasTimeLeft()) { + fail("Waiting timeout."); + } + } + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + for (int i = 1; i < numberOfSegmentsToRequest; ++i) { + globalPool.recycle(segments.get(i)); + } + segments.clear(); + + asyncRequest.sync(); + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(numBuffers, exclusiveSegments.size()); + assertFalse(globalPool.isAvailable().isDone()); Review comment: Duplicate with 601 line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339406809 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + * + * After pushing down, source only needs to try its best to limit the number of output records, + * but does not need to guarantee that the number must be less than or equal to the limit. Review comment: maybe, if the total records are less than the limit, it could be. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
[ https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960762#comment-16960762 ] Jiayi Liao edited comment on FLINK-14498 at 10/28/19 5:27 AM: -- [~zjwang][~kevin.cyj] Maybe I'm not very familiar the whole story, I have a question after review the PR. According to the JIRA description and what I've understand, the problem happens when {{LocalBufferPool}} requests bufffer from {{NetworkBufferPool}}, which means the scope should be narrowed down to {{LocalBufferPool}} and {{NetworkBufferPool}}. But why do you extend the {{isAvailable}} to {{RecordWriter}} related class? Maybe this is a stupid question but I'm a bit confused. was (Author: wind_ljy): [~zjwang][~kevin.cyj] Maybe I'm not very familiar the whole story, I have a question after review the PR. According to the JIRA description and what I've understand, the problem happens when {{LocalBufferPool}} requests bufffer from {{NetworkBufferPool}}, which means the scope should be narrowed down to LocalBufferPool and NetworkBufferPool. But why do you extend the {{isAvailable}} to {{RecordWriter}} related class? Maybe this is a stupid question but I'm a bit confused. > Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool > -- > > Key: FLINK-14498 > URL: https://issues.apache.org/jira/browse/FLINK-14498 > Project: Flink > Issue Type: Task > Components: Runtime / Network >Reporter: zhijiang >Assignee: Yingjie Cao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > If the LocalBufferPool can not request available buffer from > NetworkBufferPool, it would wait for 2 seconds before trying to request again > in a loop way. Therefore it would bring some delays in practice. > To improve this interaction, we could introduce NetworkBufferPool#isAvailable > to return a future which would be monitored by LocalBufferPool. Then once > there are available buffers in NetworkBufferPool, it would complete this > future to notify LocalBufferPool immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
[ https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960762#comment-16960762 ] Jiayi Liao commented on FLINK-14498: [~zjwang][~kevin.cyj] Maybe I'm not very familiar the whole story, I have a question after review the PR. According to the JIRA description and what I've understand, the problem happens when {{LocalBufferPool}} requests bufffer from {{NetworkBufferPool}}, which means the scope should be narrowed down to LocalBufferPool and NetworkBufferPool. But why do you extend the {{isAvailable}} to {{RecordWriter}} related class? Maybe this is a stupid question but I'm a bit confused. > Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool > -- > > Key: FLINK-14498 > URL: https://issues.apache.org/jira/browse/FLINK-14498 > Project: Flink > Issue Type: Task > Components: Runtime / Network >Reporter: zhijiang >Assignee: Yingjie Cao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > If the LocalBufferPool can not request available buffer from > NetworkBufferPool, it would wait for 2 seconds before trying to request again > in a loop way. Therefore it would bring some delays in practice. > To improve this interaction, we could introduce NetworkBufferPool#isAvailable > to return a future which would be monitored by LocalBufferPool. Then once > there are available buffers in NetworkBufferPool, it would complete this > future to notify LocalBufferPool immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339406297 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +import java.util.Collections + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + * + * The reasons why the limit still be retained: + * 1.If the source is required to return the exact number of limit number, the implementation + * of the source is highly required. The source is required to accurately control the record + * number of split, and the parallelism setting also need to be adjusted accordingly. + * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed + * down. The source need know it should do limit first and do the filter later, it is hard to + * implement. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && +fetch != null + +var supportPushDown = false +if (onlyLimit) { + supportPushDown = call.rel(1).asInstanceOf[TableScan] + .getTable.unwrap(classOf[TableSourceTable[_]]) match { +case table: TableSourceTable[_] => + table.tableSource match { +case source: LimitableTableSource[_] => !source.isLimitPushedDown +case _ => false + } +case _ => false + } +} +supportPushDown + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val sort = call.rel(0).asInstanceOf[Sort] +val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] +val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable] +val limit = RexLiteral.intValue(sort.fetch) +val relBuilder = call.builder() +val newRelOptTable = applyLimit(limit, relOptTable, relBuilder) +val newScan = scan.copy(scan.getTraitSet, newRelOptTable) + +val newTableSource = newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource +val oldTableSource = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource + +if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown +&& newTableSource.explainSource().equals(oldTableSource.explainSource)) { + throw new TableException("Failed to push limit into table source! " + + "table source with pushdown capability must override and change " + + "explainSource() API to explain the pushdown applied!") +} + +call.transformTo(sort.copy(sort.getTraitSet,
[jira] [Commented] (FLINK-14026) Manage the resource of Python worker properly
[ https://issues.apache.org/jira/browse/FLINK-14026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960761#comment-16960761 ] Dian Fu commented on FLINK-14026: - [~sunjincheng121] Thanks a lot for the remind. I have drafted a design doc which is integrated with [FLIP-53|https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management] in which it proposes a fine grained operator resource management. I will submit it to the DEV mailing list for discussion later on. > Manage the resource of Python worker properly > - > > Key: FLINK-14026 > URL: https://issues.apache.org/jira/browse/FLINK-14026 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Fix For: 1.10.0 > > > For a Flink Table API & SQL job, if it uses Python user-defined functions, > the Java operator will launch separate Python process for Python user-defined > function execution. We should make sure that the resources used by the Python > process are managed by Flink’s resource management framework. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339406308 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + * + * After pushing down, source only needs to try its best to limit the number of output records, + * but does not need to guarantee that the number must be less than or equal to the limit. Review comment: Btw, `less than or equal to the limit`, the source shouldn't generate records less than the limit, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339405733 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + * + * After pushing down, source only needs to try its best to limit the number of output records, + * but does not need to guarantee that the number must be less than or equal to the limit. Review comment: I think this word just tell user: there is no limit, you just do what you can do. The comments in `PushLimitIntoTableSourceScanRule` is difficult to understand without code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339405733 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + * + * After pushing down, source only needs to try its best to limit the number of output records, + * but does not need to guarantee that the number must be less than or equal to the limit. Review comment: I think this word just tell user: there is no limit, you just do what you can do. The comments in `PushLimitIntoTableSourceScanRule` is difficult to understand without code, I think we can just add a link to `PushLimitIntoTableSourceScanRule`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1
[ https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960759#comment-16960759 ] Shuwen Zhou commented on FLINK-14538: - [~wind_ljy] It was deployed on Yarn cluster. The command of job manager is following: {code:java} /usr/lib/jvm/java-1.8.0-openjdk/bin/java -Xms2304m -Xmx2304m -XX:+PrintHeapAtGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -Dlog.file=/ebs_hadoop_data/yarn-data/log/application_156508035_24/container_e20_156508035_24_01_01/jobmanager.log -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint {code} The command of task manager is : {code:java} /usr/lib/jvm/java-1.8.0-openjdk/bin/java -Xms9830m -Xmx9830m -XX:MaxDirectMemorySize=6554m -XX:+PrintHeapAtGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -Dlog.file=/ebs_hadoop_data/yarn-data/log/application_156508035_24/container_e20_156508035_24_01_000163/taskmanager.log -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . {code} Thanks. I was expecting NonHeap memory will be logged as default value. > Metrics of Status.JVM.Memory.NonHeap.Max is always -1 > - > > Key: FLINK-14538 > URL: https://issues.apache.org/jira/browse/FLINK-14538 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Shuwen Zhou >Priority: Minor > Attachments: image-2019-10-28-12-34-53-413.png > > > I'm collecting JobManager and TaskManager status metrics to DataDog. > While all metrics of both JobManager and TaskManager > Status.JVM.Memory.NonHeap.Max is always -1 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes
flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes URL: https://github.com/apache/flink/pull/9854#issuecomment-539497852 ## CI report: * 9adcb150ed88bfa077642f6f590ec4ae9183baf5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130951546) * 7beda6a8760882ae464701d2f9faf5ba4bb9be3a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133562068) * d035936297745e9e7d6189680f0cb2b3c8aec245 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133758460) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1
[ https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960754#comment-16960754 ] Jiayi Liao commented on FLINK-14538: [~jacobc3] This usually means the nonHeap memory is not defined. 1. Does this deploy on Yarn or Flink standalone cluster? 2. Could you show us the java command that starts jobManager / taskManager (something like {{/path/to/java -Xmx...}}). > Metrics of Status.JVM.Memory.NonHeap.Max is always -1 > - > > Key: FLINK-14538 > URL: https://issues.apache.org/jira/browse/FLINK-14538 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Shuwen Zhou >Priority: Minor > Attachments: image-2019-10-28-12-34-53-413.png > > > I'm collecting JobManager and TaskManager status metrics to DataDog. > While all metrics of both JobManager and TaskManager > Status.JVM.Memory.NonHeap.Max is always -1 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C
flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C URL: https://github.com/apache/flink/pull/10010#issuecomment-546787757 ## CI report: * 4b0f9efdc169d3b6375d469cd3512e87e98f0f19 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133758455) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions
flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions URL: https://github.com/apache/flink/pull/10007#issuecomment-546673889 ## CI report: * 2f64299fb74a59f77f14f20bed87191389f58317 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133706395) * 5b16730a6403a9cffdc564924be89121561f2e82 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133755954) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes
flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes URL: https://github.com/apache/flink/pull/9854#issuecomment-539497852 ## CI report: * 9adcb150ed88bfa077642f6f590ec4ae9183baf5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/130951546) * 7beda6a8760882ae464701d2f9faf5ba4bb9be3a : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133562068) * d035936297745e9e7d6189680f0cb2b3c8aec245 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI
[ https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960752#comment-16960752 ] Congxian Qiu(klion26) commented on FLINK-14264: --- Kindly ping [~aljoscha]. If this is valid, I just want to add a property for {{stateBackend}} in {{CheckpointCoordinatorConfiguration}}, and pass this value when init {{CheckpointCoordinatorConfiguration}} in {{StreamJobGraphGenerator}}#configureCheckpointing. The backend value will get from {{StreamGraph}}. what do you think about this? > Expose CheckpointBackend in checkpoint config RestAPI > - > > Key: FLINK-14264 > URL: https://issues.apache.org/jira/browse/FLINK-14264 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, we can get checkpoint config from rest api[1], the response > contains the information as below > * timeout > * min_pause > * max_concurrent > * externalization > But did not contain the type of CheckpointBackend, but in some scenarios, we > want to get the CheckpointBackend type from Rest, this issue wants to add the > simple name of the CheckpointBackend in the {{checkpoints/config rest with > key }}{{checkpoint_backend, so the response will contain the information such > as below}} > * {{timeout}} > * {{min_pause}} > * {{max_concurrent}} > * checkpoint_backend > * externalization > > [1] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1
[ https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuwen Zhou updated FLINK-14538: Description: I'm collecting JobManager and TaskManager status metrics to DataDog. While all metrics of both JobManager and TaskManager Status.JVM.Memory.NonHeap.Max is always -1 was: I'm collecting TaskManager status metrics to DataDog. While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 > Metrics of Status.JVM.Memory.NonHeap.Max is always -1 > - > > Key: FLINK-14538 > URL: https://issues.apache.org/jira/browse/FLINK-14538 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Shuwen Zhou >Priority: Minor > Attachments: image-2019-10-28-12-34-53-413.png > > > I'm collecting JobManager and TaskManager status metrics to DataDog. > While all metrics of both JobManager and TaskManager > Status.JVM.Memory.NonHeap.Max is always -1 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1
[ https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuwen Zhou updated FLINK-14538: Summary: Metrics of Status.JVM.Memory.NonHeap.Max is always -1 (was: Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1) > Metrics of Status.JVM.Memory.NonHeap.Max is always -1 > - > > Key: FLINK-14538 > URL: https://issues.apache.org/jira/browse/FLINK-14538 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Shuwen Zhou >Priority: Minor > Attachments: image-2019-10-28-12-34-53-413.png > > > I'm collecting TaskManager status metrics to DataDog. > While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C
flinkbot commented on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C URL: https://github.com/apache/flink/pull/10010#issuecomment-546787757 ## CI report: * 4b0f9efdc169d3b6375d469cd3512e87e98f0f19 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14538) Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1
[ https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuwen Zhou updated FLINK-14538: Description: I'm collecting TaskManager status metrics to DataDog. While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 was: I'm collecting TaskManager status metrics to DataDog. While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 !image-2019-10-28-12-34-53-413.png! > Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 > - > > Key: FLINK-14538 > URL: https://issues.apache.org/jira/browse/FLINK-14538 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.9.0 >Reporter: Shuwen Zhou >Priority: Minor > Attachments: image-2019-10-28-12-34-53-413.png > > > I'm collecting TaskManager status metrics to DataDog. > While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14538) Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1
Shuwen Zhou created FLINK-14538: --- Summary: Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 Key: FLINK-14538 URL: https://issues.apache.org/jira/browse/FLINK-14538 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.9.0 Reporter: Shuwen Zhou Attachments: image-2019-10-28-12-34-53-413.png I'm collecting TaskManager status metrics to DataDog. While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1 !image-2019-10-28-12-34-53-413.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese
Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese URL: https://github.com/apache/flink/pull/9805#discussion_r339399783 ## File path: docs/dev/stream/state/checkpointing.zh.md ## @@ -173,30 +165,26 @@ Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see ## Selecting a State Backend Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339398802 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } + + final List exclusiveSegments = globalPool.requestMemorySegments(); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(0, globalPool.getNumberOfAvailableMemorySegments()); + + final AtomicBoolean completeFlag = new AtomicBoolean(false); + CompletableFuture availableFuture = globalPool.isAvailable(); + availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); + + // recycle one buffer + globalPool.recycle(segments.get(0)); + assertTrue(completeFlag.get()); + assertTrue(availableFuture.isDone()); + assertTrue(globalPool.isAvailable().isDone()); + assertEquals(1, globalPool.getNumberOfAvailableMemorySegments()); Review comment: ditto: we could extract a separate test for verifying the `globalPool.requestMemorySegments()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunhaibotb edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes
sunhaibotb edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes URL: https://github.com/apache/flink/pull/9854#issuecomment-546785241 I've addressed the lastest comments and added some detailed descriptions to the commit messages. Thanks @zhijiangW. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339398713 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } Review comment: It is better to extract a separate test for only verifying the logic of `globalPool.requestMemorySegment()`. E.g. before requesting the single segment the global pool is available, and after requesting it becomes unavailable. This test mixes many functions to be verified together, then it is difficult to trace the logic and maintain the code future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10435) Client sporadically hangs after Ctrl + C
[ https://issues.apache.org/jira/browse/FLINK-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960738#comment-16960738 ] Yang Wang edited comment on FLINK-10435 at 10/28/19 4:14 AM: - I have tested multiple times and found the root cause. It just because the yarn client is closed by `YarnClusterDescriptor#close`. However, it is still in use by `DeploymentFailureHook#failSessionDuringDeployment`. So it encounter an exception in `yarnClient.killApplication` and will retry based on the policy(default is 30 times). The retry process in shutdown hook may take long time. Since flink client main thread(CliFrontend->YarnClusterDescriptor) and shut hook thread both need the yarn client, and we can not guarantee the order. I suggest to remove the `yarnClient.close()` in `failSessionDuringDeployment` and use a new yarn client in shut down hook. [~trohrmann], [~tison] i think this is bug. How do you think? was (Author: fly_in_gis): I have tested multiple times and found the root cause. It just because the yarn client is closed by `YarnClusterDescriptor#close`. However, it is still in use by `DeploymentFailureHook#failSessionDuringDeployment`. So it encounter an exception in `yarnClient.killApplication` and will retry based on the policy(default is 30 times). The retry process in shutdown hook may take long time. Since flink client main thread(CliFrontend->YarnClusterDescriptor) and shut hook thread both need the yarn client, and we can not guarantee the order. I suggest to remove the `yarnClient.close()` in `failSessionDuringDeployment` and use a new yarn client in shut down hook. [~trohrmann], i think this is bug. How do you think? > Client sporadically hangs after Ctrl + C > > > Key: FLINK-10435 > URL: https://issues.apache.org/jira/browse/FLINK-10435 > Project: Flink > Issue Type: Bug > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.5.5, 1.6.2, 1.7.0, 1.9.1 >Reporter: Gary Yao >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > When submitting a YARN job cluster in attached mode, the client hangs > indefinitely if Ctrl + C is pressed at the right time. One can recover from > this by sending SIGKILL. > *Command to submit job* > {code} > HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > examples/streaming/WordCount.jar > {code} > > *Output/Stacktrace* > {code} > [hadoop@ip-172-31-45-22 flink-1.5.4]$ HADOOP_CLASSPATH=`hadoop classpath` > bin/flink run -m yarn-cluster examples/streaming/WordCount.jar > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/hadoop/flink-1.5.4/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-09-26 12:01:04,241 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at > ip-172-31-45-22.eu-central-1.compute.internal/172.31.45.22:8032 > 2018-09-26 12:01:04,386 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-26 12:01:04,386 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-26 12:01:04,402 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-09-26 12:01:04,598 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=1024, > taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} > 2018-09-26 12:01:04,972 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The > configuration directory ('/home/hadoop/flink-1.5.4/conf') contains both LOG4J > and Logback configuration files. Please delete or rename one of them. > 2018-09-26 12:01:07,857 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting > application master application_1537944258063_0017 > 2018-09-26 12:01:07,913
[GitHub] [flink] wangyang0918 opened a new pull request #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C
wangyang0918 opened a new pull request #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C URL: https://github.com/apache/flink/pull/10010 ## What is the purpose of the change This PR is to fix the bug when deploying on yarn in attach mode that flink client hangs after `Ctrl + C`. More information could be found at [FLINK-10435](https://issues.apache.org/jira/browse/FLINK-10435). ## Brief change log * remove the `yarnClient.close()` in `failSessionDuringDeployment` and use a new yarn client in `DeploymentFailureHook` ## Verifying this change This change can be verified as follows: * Manually run a command to start flink cluster on yarn in attach mode. `./bin/flink run -m yarn-cluster examples/streaming/WindowJoin.jar` * When the application is in ACCEPTED status, send the `Ctrl + C`. * The flink client exits immediately. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10435) Client sporadically hangs after Ctrl + C
[ https://issues.apache.org/jira/browse/FLINK-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10435: --- Labels: pull-request-available (was: ) > Client sporadically hangs after Ctrl + C > > > Key: FLINK-10435 > URL: https://issues.apache.org/jira/browse/FLINK-10435 > Project: Flink > Issue Type: Bug > Components: Command Line Client, Deployment / YARN >Affects Versions: 1.5.5, 1.6.2, 1.7.0, 1.9.1 >Reporter: Gary Yao >Priority: Major > Labels: pull-request-available > > When submitting a YARN job cluster in attached mode, the client hangs > indefinitely if Ctrl + C is pressed at the right time. One can recover from > this by sending SIGKILL. > *Command to submit job* > {code} > HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > examples/streaming/WordCount.jar > {code} > > *Output/Stacktrace* > {code} > [hadoop@ip-172-31-45-22 flink-1.5.4]$ HADOOP_CLASSPATH=`hadoop classpath` > bin/flink run -m yarn-cluster examples/streaming/WordCount.jar > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/hadoop/flink-1.5.4/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-09-26 12:01:04,241 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at > ip-172-31-45-22.eu-central-1.compute.internal/172.31.45.22:8032 > 2018-09-26 12:01:04,386 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-26 12:01:04,386 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-26 12:01:04,402 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-09-26 12:01:04,598 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=1024, > taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} > 2018-09-26 12:01:04,972 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The > configuration directory ('/home/hadoop/flink-1.5.4/conf') contains both LOG4J > and Logback configuration files. Please delete or rename one of them. > 2018-09-26 12:01:07,857 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting > application master application_1537944258063_0017 > 2018-09-26 12:01:07,913 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1537944258063_0017 > 2018-09-26 12:01:07,913 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for > the cluster to be allocated > 2018-09-26 12:01:07,916 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying > cluster, current state ACCEPTED > ^C2018-09-26 12:01:08,851 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cancelling > deployment from Deployment Failure Hook > 2018-09-26 12:01:08,854 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Killing YARN > application > > The program finished with the following exception: > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:410) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:258) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at >
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339397642 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } + + final List exclusiveSegments = globalPool.requestMemorySegments(); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(0, globalPool.getNumberOfAvailableMemorySegments()); + + final AtomicBoolean completeFlag = new AtomicBoolean(false); + CompletableFuture availableFuture = globalPool.isAvailable(); + availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); + + // recycle one buffer + globalPool.recycle(segments.get(0)); + assertTrue(completeFlag.get()); + assertTrue(availableFuture.isDone()); + assertTrue(globalPool.isAvailable().isDone()); Review comment: Even we could further remove below: ``` final AtomicBoolean completeFlag = new AtomicBoolean(false); availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); assertTrue(completeFlag.get()); ``` Before recycling, the global pool is not available, after recycling it becomes available via `assertTrue(globalPool.isAvailable().isDone())`. It is already enough to verify our logic changes. It is useless to verify the logic action (`completeFlag.set(true))`) when future is completed, because in real code path the action is actually different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever URL: https://github.com/apache/flink/pull/9950#discussion_r339284001 ## File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java ## @@ -312,6 +381,60 @@ private static void generateRandomDirs(File dir, int numFiles, int numDirs, int } } + /** +* Generate some files in the directory {@code dir}. +* @param dir the directory where the files are generated +* @return Tuple3 holding the generated files' absolute path, relative to the working directory path and relative +* url. +* @throws IOException if I/O error occurs while generating the files +*/ + public static Tuple3, Collection, Collection> prepareTestFiles( + final java.nio.file.Path dir) throws IOException { + + Tuple3, Collection, Collection> result = new Tuple3<>(); + + result.f0 = generateSomeFilesInDirectoryReturnJarFiles(dir); + result.f1 = toRelativeFiles(result.f0); + result.f2 = toRelativeURLs(result.f1); + + return result; + } + + private static Collection generateSomeFilesInDirectoryReturnJarFiles( + final java.nio.file.Path dir) throws IOException { + + final java.nio.file.Path jobSubDir1 = Files.createDirectory(dir.resolve("_sub_dir1")); + final java.nio.file.Path jobSubDir2 = Files.createDirectory(dir.resolve("_sub_dir2")); + final java.nio.file.Path jarFile1 = Files.createFile(dir.resolve("file1.jar")); + final java.nio.file.Path jarFile2 = Files.createFile(dir.resolve("file2.jar")); + final java.nio.file.Path jarFile3 = Files.createFile(jobSubDir1.resolve("file3.jar")); + final java.nio.file.Path jarFile4 = Files.createFile(jobSubDir2.resolve("file4.jar")); + final Collection jarFiles = new ArrayList<>(); + + Files.createFile(dir.resolve("file1.txt")); + Files.createFile(jobSubDir2.resolve("file2.txt")); + + jarFiles.add(jarFile1.toFile()); + jarFiles.add(jarFile2.toFile()); + jarFiles.add(jarFile3.toFile()); + jarFiles.add(jarFile4.toFile()); + return jarFiles; + } + + private static Collection toRelativeFiles(Collection files) { + final java.nio.file.Path workingDir = Paths.get(System.getProperty("user.dir")); + final Collection relativeFiles = new ArrayList<>(); + files.forEach(file -> relativeFiles.add(workingDir.relativize(file.toPath()).toFile())); + return relativeFiles; + } + + private static Collection toRelativeURLs(Collection relativeFiles) throws MalformedURLException { + final Collection relativeURLs = new ArrayList<>(); + final URL context = new URL(relativeFiles.iterator().next().toURI().getScheme() + ":"); + relativeFiles.forEach(FunctionUtils.uncheckedConsumer(file -> relativeURLs.add(new URL(context, file.toString(); Review comment: maybe file.toString() -> file.getPath()? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions
flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions URL: https://github.com/apache/flink/pull/10007#issuecomment-546673889 ## CI report: * 2f64299fb74a59f77f14f20bed87191389f58317 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133706395) * 5b16730a6403a9cffdc564924be89121561f2e82 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133755954) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339397163 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); + assertTrue(globalPool.isAvailable().isDone()); + } + + final List exclusiveSegments = globalPool.requestMemorySegments(); + assertEquals(numberOfSegmentsToRequest, exclusiveSegments.size()); + + assertFalse(globalPool.isAvailable().isDone()); + assertEquals(0, globalPool.getNumberOfAvailableMemorySegments()); + + final AtomicBoolean completeFlag = new AtomicBoolean(false); + CompletableFuture availableFuture = globalPool.isAvailable(); + availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true)); + + // recycle one buffer + globalPool.recycle(segments.get(0)); + assertTrue(completeFlag.get()); + assertTrue(availableFuture.isDone()); + assertTrue(globalPool.isAvailable().isDone()); Review comment: ``` assertTrue(availableFuture.isDone()); assertTrue(globalPool.isAvailable().isDone()); ``` They are the same thing and we could remove one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339396841 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( Review comment: We could use the simple constructor `NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, int numberOfSegmentsToRequest)` for tests instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.
zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool. URL: https://github.com/apache/flink/pull/9993#discussion_r339396836 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ## @@ -518,4 +528,176 @@ public void go() throws Exception { globalPool.destroy(); } } + + /** +* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the buffer availability is +* correctly maintained and the future callback is correctly processed. +*/ + @Test + public void testBufferAvailabilityAndFutureCallback() throws Exception { + final int numBuffers = 10; + final int numberOfSegmentsToRequest = 5; + final Duration requestSegmentsTimeout = Duration.ofSeconds(10L); + + final NetworkBufferPool globalPool = new NetworkBufferPool( + numBuffers, + 128, + numberOfSegmentsToRequest, + requestSegmentsTimeout); + + try { + assertTrue(globalPool.isAvailable().isDone()); + + List segments = new ArrayList<>(numberOfSegmentsToRequest); + for (int i = 0; i < numberOfSegmentsToRequest; ++i) { + MemorySegment segment = globalPool.requestMemorySegment(); + assertNotNull(segment); + segments.add(segment); Review comment: `assertNotNull(segment)` could be done via `segments.add(checkNotNull(segment))`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339396074 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/UnresolvedIdentifier.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Identifier of an object, such as table, view, function or type in a catalog. This identifier + * cannot be used directly to access an object in a {@link CatalogManager}, but has to be first + * fully resolved into {@link ObjectIdentifier}. + */ +@PublicEvolving +public class UnresolvedIdentifier { Review comment: Some thoughts about this class: 1. Should it be `@Internal`? I don't find any public interface or class referencing this yet. Otherwise, I would suggest to put this in `flink-table-common`. 2. I kind of get confused by the prefix `Unresolved`. Normally we will do some validation check to make it a "resolved" identifier. But according to the implementation, we just expand the path. It dons't sound get resolved yet in my opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339395859 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +import java.util.Collections + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + * + * The reasons why the limit still be retained: + * 1.If the source is required to return the exact number of limit number, the implementation + * of the source is highly required. The source is required to accurately control the record + * number of split, and the parallelism setting also need to be adjusted accordingly. + * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed + * down. The source need know it should do limit first and do the filter later, it is hard to + * implement. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && Review comment: I think having an offset doens't matter, because the planner will do the offset anyway (like we will do the limit anyway). Say we have a `limit 10 offset 1`, I think we can push down `limit 11` and do the `limit 10 offset 1` after the source. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339389965 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; + +/** + * Adds support for limiting push-down to a {@link TableSource}. + * A {@link TableSource} extending this interface is able to limit the number of records. + * + * After pushing down, source only needs to try its best to limit the number of output records, + * but does not need to guarantee that the number must be less than or equal to the limit. Review comment: Could you add more description here about split limit and planner guarantee (they are mentioned in the rule javadoc)? I think these are useful to users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339392752 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +import java.util.Collections + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + * + * The reasons why the limit still be retained: + * 1.If the source is required to return the exact number of limit number, the implementation + * of the source is highly required. The source is required to accurately control the record + * number of split, and the parallelism setting also need to be adjusted accordingly. + * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed + * down. The source need know it should do limit first and do the filter later, it is hard to + * implement. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && +fetch != null + +var supportPushDown = false +if (onlyLimit) { + supportPushDown = call.rel(1).asInstanceOf[TableScan] + .getTable.unwrap(classOf[TableSourceTable[_]]) match { +case table: TableSourceTable[_] => + table.tableSource match { +case source: LimitableTableSource[_] => !source.isLimitPushedDown +case _ => false + } +case _ => false + } +} +supportPushDown + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val sort = call.rel(0).asInstanceOf[Sort] +val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] +val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable] +val limit = RexLiteral.intValue(sort.fetch) +val relBuilder = call.builder() +val newRelOptTable = applyLimit(limit, relOptTable, relBuilder) +val newScan = scan.copy(scan.getTraitSet, newRelOptTable) + +val newTableSource = newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource +val oldTableSource = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource + +if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown +&& newTableSource.explainSource().equals(oldTableSource.explainSource)) { + throw new TableException("Failed to push limit into table source! " + + "table source with pushdown capability must override and change " + + "explainSource() API to explain the pushdown applied!") +} + +call.transformTo(sort.copy(sort.getTraitSet, Collections.singletonList(newScan)))
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339395422 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, FlinkLogicalTableSourceScan} +import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.LimitableTableSource + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.{Sort, TableScan} +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.tools.RelBuilder + +import java.util.Collections + +/** + * Planner rule that tries to push limit into a [[LimitableTableSource]]. + * The original limit will still be retained. + * + * The reasons why the limit still be retained: + * 1.If the source is required to return the exact number of limit number, the implementation + * of the source is highly required. The source is required to accurately control the record + * number of split, and the parallelism setting also need to be adjusted accordingly. + * 2.When remove the limit, maybe filter will be pushed down to the source after limit pushed + * down. The source need know it should do limit first and do the filter later, it is hard to + * implement. + */ +class PushLimitIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalSort], +operand(classOf[FlinkLogicalTableSourceScan], none)), "PushLimitIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val sort = call.rel(0).asInstanceOf[Sort] +val fetch = sort.fetch +val offset = sort.offset +// Only push-down the limit whose offset equal zero. Because it is difficult to source based +// push to handle the non-zero offset. And the non-zero offset usually appear together with +// sort. +val onlyLimit = sort.getCollation.getFieldCollations.isEmpty && +(offset == null || RexLiteral.intValue(offset) == 0) && +fetch != null + +var supportPushDown = false +if (onlyLimit) { + supportPushDown = call.rel(1).asInstanceOf[TableScan] + .getTable.unwrap(classOf[TableSourceTable[_]]) match { +case table: TableSourceTable[_] => + table.tableSource match { +case source: LimitableTableSource[_] => !source.isLimitPushedDown +case _ => false + } +case _ => false + } +} +supportPushDown + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val sort = call.rel(0).asInstanceOf[Sort] +val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] +val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable] +val limit = RexLiteral.intValue(sort.fetch) +val relBuilder = call.builder() +val newRelOptTable = applyLimit(limit, relOptTable, relBuilder) +val newScan = scan.copy(scan.getTraitSet, newRelOptTable) + +val newTableSource = newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource +val oldTableSource = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource + +if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown +&& newTableSource.explainSource().equals(oldTableSource.explainSource)) { + throw new TableException("Failed to push limit into table source! " Review comment: Actually, I have some concerns on this. I also noticed we did the similar things in pushing filter and pushing projection rules. However, I think it pushes some dirty work to connectos, makes connectors more difficult to
[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit
wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit URL: https://github.com/apache/flink/pull/9876#discussion_r339393494 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.io.CollectionInputFormat +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.sources._ +import org.apache.flink.types.Row + +import scala.collection.JavaConverters._ + +/** + * The table source which support push-down the limit to the source. + */ +class TestLimitableTableSource( +data: Seq[Row], +rowType: RowTypeInfo, +var limit: Long = -1, +var limitablePushedDown: Boolean = false) + extends StreamTableSource[Row] + with LimitableTableSource[Row] { + + override def isBounded = true + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +val dataSet = if (limit > 0) { + data.take(limit.toInt).asJava +} else { + data.asJava +} +execEnv.createInput( + new CollectionInputFormat(dataSet, rowType.createSerializer(new ExecutionConfig)), + rowType) + } + + override def applyLimit(limit: Long): TableSource[Row] = { +new TestLimitableTableSource(data, rowType, limit, limitablePushedDown) + } + + override def isLimitPushedDown: Boolean = limitablePushedDown + + override def getReturnType: TypeInformation[Row] = rowType + + override def explainSource(): String = { +if (limit > 0 && limit < Long.MaxValue) { + "limit: " + limit +} else if (limitablePushedDown) { + "limitablePushedDown" Review comment: I think we will never reach here, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339395831 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -501,6 +537,11 @@ public void alterTable(CatalogBaseTable table, ObjectIdentifier objectIdentifier * does not exist. */ public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + if (temporaryTables.containsKey(objectIdentifier)) { Review comment: Just curious about this, I took a look to spark: ``` * If a database is specified in `name`, this will drop the table from that database. * If no database is specified, this will first attempt to drop a temporary view with * the same name, then, if that does not exist, drop the table from the current database. ``` Curious about other databases, I don't know how they behave. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339393920 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java ## @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) { * Searches for the specified table source, configures it accordingly, and registers it as * a table under the given name. * +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* * @param name table name to be registered in the table environment +* @deprecated use {@link #createTemporaryTable(String)} */ + @Deprecated public void registerTableSource(String name) { Preconditions.checkNotNull(name); TableSource tableSource = TableFactoryUtil.findAndCreateTableSource(this); - tableEnv.registerTableSource(name, tableSource); + registration.createTableSource(name, tableSource); } /** * Searches for the specified table sink, configures it accordingly, and registers it as * a table under the given name. * +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* * @param name table name to be registered in the table environment +* @deprecated use {@link #createTemporaryTable(String)} */ + @Deprecated public void registerTableSink(String name) { Preconditions.checkNotNull(name); TableSink tableSink = TableFactoryUtil.findAndCreateTableSink(this); - tableEnv.registerTableSink(name, tableSink); + registration.createTableSink(name, tableSink); } /** * Searches for the specified table source and sink, configures them accordingly, and registers * them as a table under the given name. * +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* * @param name table name to be registered in the table environment +* @deprecated use {@link #createTemporaryTable(String)} */ + @Deprecated public void registerTableSourceAndSink(String name) { registerTableSource(name); registerTableSink(name); } + /** +* Registers the table described by underlying properties in a given path. +* +* There is no distinction between source and sink at the descriptor level anymore as this +* method does not perform actual class lookup. It only stores the underlying properties. The +* actual source/sink lookup is performed when the table is used. +* +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* +* NOTE: The schema must be explicitly defined. +* +* @param path path where to register the temporary table +*/ + public void createTemporaryTable(String path) { + if (schemaDescriptor == null) { + throw new TableException( + "Table schema must be explicitly defined. To derive schema from the underlying connector" + + " use registerTableSource/registerTableSink/registerTableSourceAndSink."); + } + + Map schemaProperties = schemaDescriptor.toProperties(); + TableSchema tableSchema = getTableSchema(schemaProperties); + + Map properties = new HashMap<>(toProperties()); + schemaProperties.keySet().forEach(properties::remove); + + CatalogTableImpl catalogTable = new CatalogTableImpl( Review comment: `CatalogTable` has `partitionKeys` too, consider add partition keys in `CatalogTableImpl.toProperties` and parse partition keys here? (We can add later too) This is an automated message from the Apache Git Service.
[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339392415 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala ## @@ -73,7 +74,14 @@ abstract class TableEnvImpl( new TableReferenceLookup { override def lookupTable(name: String): Optional[TableReferenceExpression] = { JavaScalaConversionUtil - .toJava(scanInternal(Array(name)).map(t => new TableReferenceExpression(name, t))) + .toJava( +Try({ + val unresolvedIdentifier = UnresolvedIdentifier.of(name) Review comment: Why use `Try`? If it is an illegal name, should we throw exception? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339392857 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala ## @@ -363,15 +371,26 @@ abstract class TableEnvImpl( @throws[TableException] override def scan(tablePath: String*): Table = { -scanInternal(tablePath.toArray) match { +val unresolvedIdentifier = UnresolvedIdentifier.of(tablePath: _*) +scanInternal(unresolvedIdentifier) match { case Some(table) => createTable(table) case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") } } - private[flink] def scanInternal(tablePath: Array[String]): Option[CatalogQueryOperation] = { -val unresolvedIdentifier = UnresolvedIdentifier.of(tablePath: _*) -val objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier) + override def from(path: String): Table = { +val parser = planningConfigurationBuilder.createCalciteParser() +val unresolvedIdentifier = UnresolvedIdentifier.of(parser.parseIdentifier(path).names: _*) +scanInternal(unresolvedIdentifier) match { + case Some(table) => createTable(table) + case None => throw new TableException(s"Table '$path' was not found.") Review comment: Can we have a unify exception message? Use `unresolvedIdentifier`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339393982 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java ## @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) { * Searches for the specified table source, configures it accordingly, and registers it as * a table under the given name. * +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* * @param name table name to be registered in the table environment +* @deprecated use {@link #createTemporaryTable(String)} */ + @Deprecated public void registerTableSource(String name) { Preconditions.checkNotNull(name); TableSource tableSource = TableFactoryUtil.findAndCreateTableSource(this); - tableEnv.registerTableSource(name, tableSource); + registration.createTableSource(name, tableSource); } /** * Searches for the specified table sink, configures it accordingly, and registers it as * a table under the given name. * +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* * @param name table name to be registered in the table environment +* @deprecated use {@link #createTemporaryTable(String)} */ + @Deprecated public void registerTableSink(String name) { Preconditions.checkNotNull(name); TableSink tableSink = TableFactoryUtil.findAndCreateTableSink(this); - tableEnv.registerTableSink(name, tableSink); + registration.createTableSink(name, tableSink); } /** * Searches for the specified table source and sink, configures them accordingly, and registers * them as a table under the given name. * +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* * @param name table name to be registered in the table environment +* @deprecated use {@link #createTemporaryTable(String)} */ + @Deprecated public void registerTableSourceAndSink(String name) { registerTableSource(name); registerTableSink(name); } + /** +* Registers the table described by underlying properties in a given path. +* +* There is no distinction between source and sink at the descriptor level anymore as this +* method does not perform actual class lookup. It only stores the underlying properties. The +* actual source/sink lookup is performed when the table is used. +* +* Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will +* be inaccessible in the current session. To make the permanent object available again you can drop the +* corresponding temporary object. +* +* NOTE: The schema must be explicitly defined. +* +* @param path path where to register the temporary table +*/ + public void createTemporaryTable(String path) { + if (schemaDescriptor == null) { + throw new TableException( + "Table schema must be explicitly defined. To derive schema from the underlying connector" + + " use registerTableSource/registerTableSink/registerTableSourceAndSink."); + } + + Map schemaProperties = schemaDescriptor.toProperties(); + TableSchema tableSchema = getTableSchema(schemaProperties); + + Map properties = new HashMap<>(toProperties()); + schemaProperties.keySet().forEach(properties::remove); + + CatalogTableImpl catalogTable = new CatalogTableImpl( Review comment: Maybe we can use `CatalogTableBuilder` here? Looks like there some bugs in `CatalogTableBuilder`, it not remove `schemaProperties` in `properties`? This is an automated message from the Apache Git Service. To
[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774 ## CI report: * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131080663) * ec84f32bdcae0738d97ac60090691789334a279a : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133306001) * 6a12e5941baf73958146f54365ff3f267c696e11 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133310272) * 4318f07b5554bcf1abc44e912345927c65c8f8a1 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133323118) * 052e4580c08f9723e2716dedac7d26d031c4b538 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133366819) * 27cd52fadf153982bc9771ba27a436caeebb17b8 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133480918) * f44e43be2046b4d86a86891bd16189e91ddbe2ee : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert
flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert URL: https://github.com/apache/flink/pull/9796#issuecomment-536253430 ## CI report: * 3b84408bb5ee764b1a64103b1f4df3e0ceacaba7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129578380) * 681339af85489fc3fb2274371067aa2de269e621 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129582634) * 61c5f74d60d7673ccb8e4d57812bf1d95c876f1f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131283509) * 73d33a1b8588776741c8ee1a5030b10ea7cdb580 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133751387) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions
flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions URL: https://github.com/apache/flink/pull/10007#issuecomment-546673889 ## CI report: * 2f64299fb74a59f77f14f20bed87191389f58317 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133706395) * 5b16730a6403a9cffdc564924be89121561f2e82 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects
KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects URL: https://github.com/apache/flink/pull/9971#discussion_r339394157 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala ## @@ -179,15 +154,7 @@ class FlinkPlannerImpl( schemaPath: util.List[String], viewPath: util.List[String]): RelRoot = { - val parser: SqlParser = SqlParser.create(queryString, parserConfig) - var sqlNode: SqlNode = null - try { -sqlNode = parser.parseQuery - } - catch { -case e: CSqlParseException => - throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e) - } + val sqlNode: SqlNode = parser.parse(queryString) Review comment: Is it intended to re-use parser here? I noticed in `PlannerBase::parser` you pointed out that you didn't want to reuse parser since config might changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#issuecomment-546558980 ## CI report: * 74748e2c09f1b2b0b9bfb3c1850cd71696d30df8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133648556) * 407af42116be8c51f02403660e00e9f490bd7454 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133751372) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9915: [FLINK-14415][table-common] ValueLiteralExpression#equals should take array value into account
flinkbot edited a comment on issue #9915: [FLINK-14415][table-common] ValueLiteralExpression#equals should take array value into account URL: https://github.com/apache/flink/pull/9915#issuecomment-542752172 ## CI report: * d6f66852e76bac070b37e7e9c7365423e5b66853 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/132183809) * 2e16ed14f272922207b947b6b16befbda74520d8 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9804: [FLINK-14239] Fix the max watermark in StreamSource may arrive the downstream operator early
flinkbot edited a comment on issue #9804: [FLINK-14239] Fix the max watermark in StreamSource may arrive the downstream operator early URL: https://github.com/apache/flink/pull/9804#issuecomment-536287116 ## CI report: * cb5b4da4d8b5d877e296dd84095a328208263c15 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129588487) * 2906e837e6a7f70dda5bc112658b51157801150e : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/129624664) * 9e84fcb1dc2b92d87010bbc8ba92fd9a508a27d5 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129632305) * 0c2e92b518e88c88b90c876cc8d20e688c51cecc : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/130935417) * 6780c04adab7c716882706600c1d3ff1af118b5b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/131060082) * a490f41f23e59f4e9a4c10e6c7c5ddf66e25737b : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert
flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert URL: https://github.com/apache/flink/pull/9796#issuecomment-536253430 ## CI report: * 3b84408bb5ee764b1a64103b1f4df3e0ceacaba7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129578380) * 681339af85489fc3fb2274371067aa2de269e621 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129582634) * 61c5f74d60d7673ccb8e4d57812bf1d95c876f1f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131283509) * 73d33a1b8588776741c8ee1a5030b10ea7cdb580 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133751387) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
wuchong commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#discussion_r339387673 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala ## @@ -505,7 +503,7 @@ object AggregateUtil extends Enumeration { case VARCHAR | CHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE) Review comment: +1 I think this is legacy code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
JingsongLi commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#discussion_r339387049 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala ## @@ -505,7 +503,7 @@ object AggregateUtil extends Enumeration { case VARCHAR | CHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE) Review comment: Can you change this to `DataTypes.VARCHAR(precision).bridgeTo(BinaryString.class)`, I think that is the right way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
JingsongLi commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#discussion_r339387049 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala ## @@ -505,7 +503,7 @@ object AggregateUtil extends Enumeration { case VARCHAR | CHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE) Review comment: Can you change this to `DataTypes.VARCHAR(precision).bridgeTo(BinaryString.class)` and `DataTypes.CHAR(precision).bridgeTo(BinaryString.class)`, I think that is the right way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tony810430 commented on issue #9824: [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when ena
tony810430 commented on issue #9824: [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS URL: https://github.com/apache/flink/pull/9824#issuecomment-546769755 hi @becketqin, do you have free time to review this PR? I have done the test as you suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #9972: [FLINK-14496][client] Exclude detach flag from ClusterClient
TisonKun commented on issue #9972: [FLINK-14496][client] Exclude detach flag from ClusterClient URL: https://github.com/apache/flink/pull/9972#issuecomment-546768430 Hi @aljoscha & @kl0u , I'm glad to hear your ideas :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #9915: [FLINK-14415][table-common] ValueLiteralExpression#equals should take array value into account
wuchong commented on issue #9915: [FLINK-14415][table-common] ValueLiteralExpression#equals should take array value into account URL: https://github.com/apache/flink/pull/9915#issuecomment-546768292 Thanks @dawidwys , I will update and merge this then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-14535) Fix distinct key type for DecimalType in DistinctInfo
[ https://issues.apache.org/jira/browse/FLINK-14535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-14535: --- Assignee: Zhenghua Gao > Fix distinct key type for DecimalType in DistinctInfo > - > > Key: FLINK-14535 > URL: https://issues.apache.org/jira/browse/FLINK-14535 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.9.1 >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > DecimalType in DistinctInfo bridged to wrong external BigDecimal type, which > causes failures count distinct on decimal type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
wuchong edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#issuecomment-546767439 Hi @docete , thanks for the contribution. The change looks good to me. I think we can improve the tests a bit more. Could you add an integration test which tests count distinct on all types? So that, we can merge `testTimeDistinct` , `testDateDistinct`, `testTimestampDistinct`,`testCountDistinct` into one. And we should add a same test in `SplitAggregateITCase`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
wuchong commented on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#issuecomment-546767439 Hi @docete , thanks for the contribution. The change looks good to me. I think we can improve the tests a bit more. Could you add an integration test which tests count distinct on all types? So that, we can merge `testTimeDistinct` , `testDateDistinct`, `testTimestampDistinct`,`testCountDistinct` into one. And we should add a same test in `SplitAggregateITCase`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever URL: https://github.com/apache/flink/pull/9950#discussion_r339385051 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.util.FileUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Abstract class for the JobGraphRetriever, which wants to get classpath user's code depends on. + */ +public abstract class AbstractUserClassPathJobGraphRetriever implements JobGraphRetriever { + + /* A collection of relative jar paths to the working directory */ + private final List userClassPaths; + + protected AbstractUserClassPathJobGraphRetriever(@Nullable final File jobDir) throws IOException { + if (jobDir == null) { + userClassPaths = Collections.emptyList(); + } else { + final Collection jarFiles = FileUtils.listFilesInPath(jobDir, file -> file.getName().endsWith(".jar")); + final Collection relativeFiles = FileUtils.relativizeToWorkingDir(jarFiles); + this.userClassPaths = new ArrayList<>(FileUtils.toRelativeURLs(relativeFiles)); Review comment: Or do we need a specified rule to order the classpaths if we'd like it to be List? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #9927: [FLINK-14397][hive] Failed to run Hive UDTF with array arguments
lirui-apache commented on issue #9927: [FLINK-14397][hive] Failed to run Hive UDTF with array arguments URL: https://github.com/apache/flink/pull/9927#issuecomment-546766553 @bowenli86 Has this one been merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert
flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert URL: https://github.com/apache/flink/pull/9796#issuecomment-536253430 ## CI report: * 3b84408bb5ee764b1a64103b1f4df3e0ceacaba7 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129578380) * 681339af85489fc3fb2274371067aa2de269e621 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/129582634) * 61c5f74d60d7673ccb8e4d57812bf1d95c876f1f : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/131283509) * 73d33a1b8588776741c8ee1a5030b10ea7cdb580 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT… URL: https://github.com/apache/flink/pull/10001#issuecomment-546558980 ## CI report: * 74748e2c09f1b2b0b9bfb3c1850cd71696d30df8 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133648556) * 407af42116be8c51f02403660e00e9f490bd7454 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-13839) Support to set yarn node label for flink jobmanager and taskmanager container
[ https://issues.apache.org/jira/browse/FLINK-13839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang closed FLINK-13839. - Resolution: Duplicate > Support to set yarn node label for flink jobmanager and taskmanager container > - > > Key: FLINK-13839 > URL: https://issues.apache.org/jira/browse/FLINK-13839 > Project: Flink > Issue Type: New Feature > Components: Deployment / YARN >Reporter: Yang Wang >Priority: Major > > Yarn node label feature is introduced from 2.6. It is a way to group nodes > with similar characteristics and applications can specify where to run. In > the production or cloud environment, we want to the jobmanager running on > some more stable machines. The node label could help us to achieve that. > > However, the ResourceRequest.setNodeLabelExpression have not been supported > in the current hadoop version dependency(2.4.1). So we need to bump the > hadoop version to 2.6.5. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-12675) Event time synchronization in Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960301#comment-16960301 ] Jiayi Liao edited comment on FLINK-12675 at 10/28/19 12:03 AM: --- [~thw] I've drafted a design for this. It follows your design on Kinesis Source but there're still a lot of changes due to the difference between these two connectors processing model. Could you take a look if you can spare time? https://docs.google.com/document/d/1d4XhVK4BD9GGNqhfSk_U30nEYOXx8THCHhR77RNbIbA/edit?usp=sharing was (Author: wind_ljy): [~thw] I've drafted a design for this. It follows your design on Kinesis Source but there're still a lot of changes due to the difference between these two connectors processing model. Could you take a look if you can spare time? https://docs.google.com/document/d/1d4XhVK4BD9GGNqhfSk_U30nEYOXx8THCHhR77RNbIbA/edit# > Event time synchronization in Kafka consumer > > > Key: FLINK-12675 > URL: https://issues.apache.org/jira/browse/FLINK-12675 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > Integrate the source watermark tracking into the Kafka consumer and implement > the sync mechanism (different consumer model, compared to Kinesis). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133733201) * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/133738302) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133733201) * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133738302) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133733201) * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133733201) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint URL: https://github.com/apache/flink/pull/10008#issuecomment-546719490 ## CI report: * 4e9fe99659cd1826b676b87a1029453fc46edccb : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/133729777) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133732010) * 36d23db6c3e89c7044d4eaf916661745c69c349b : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255 ## CI report: * 2b39236d76b1c56d240f2476c535568f0614c577 : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009#issuecomment-546723463 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 2b39236d76b1c56d240f2476c535568f0614c577 (Sun Oct 27 19:02:14 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-14304).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise opened a new pull request #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox
AHeise opened a new pull request #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox URL: https://github.com/apache/flink/pull/10009 ## What is the purpose of the change Currently, all mails are always prioritized over regular input, which makes sense in most cases. However, it's easy to devise an operator that gets into starvation: each mail enqueues a new mail. In concept, this ticket implements a simple extension in the mailbox processor: instead of draining the mailbox one-by-one, fetch all mails from the mailbox and run them one-by-one before running the default action. Only then, fetch all mails again and repeat. So we execute all mails that are available at the start of this loop but no mails that are added in the meantime. ## Brief change log - Add batch capability to `TaskMailbox`. A batch takes a snapshot of the current mails and makes them available for subsequent processing. - `MailProcessor` only processes a batch per invocation of `#processMail()` - Major cleanup of `Mailbox` to clearly show the underlying threading model. ## Verifying this change *(Please pick either of the following options)* Mostly covered by existing tests. - One new test that explicitly tests task starvation. - New tests for the new interfaces. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no, but it refines Mailbox model) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14304) Avoid task starvation with mailbox
[ https://issues.apache.org/jira/browse/FLINK-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14304: --- Labels: pull-request-available (was: ) > Avoid task starvation with mailbox > -- > > Key: FLINK-14304 > URL: https://issues.apache.org/jira/browse/FLINK-14304 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Arvid Heise >Priority: Major > Labels: pull-request-available > > Currently, all mails are always prioritized over regular input, which makes > sense in most cases. However, it's easy to devise an operator that gets into > starvation: each mail enqueues a new mail. > This ticket implements a simple extension in the mailbox processor: instead > of draining the mailbox one-by-one, fetch all mails from the mailbox and run > them one-by-one before running the default action. Only then, fetch all mails > again and repeat. > So we execute all mails that are available at the start of this loop but no > mails that are added in the meantime. > Special attention needs to be directed towards yield to downstream, such that > it doesn't process mails outside of the current batch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint URL: https://github.com/apache/flink/pull/10008#issuecomment-546719490 ## CI report: * 4e9fe99659cd1826b676b87a1029453fc46edccb : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/133729777) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
flinkbot commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint URL: https://github.com/apache/flink/pull/10008#issuecomment-546719490 ## CI report: * 4e9fe99659cd1826b676b87a1029453fc46edccb : UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
flinkbot commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint URL: https://github.com/apache/flink/pull/10008#issuecomment-546719029 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 4e9fe99659cd1826b676b87a1029453fc46edccb (Sun Oct 27 18:06:41 UTC 2019) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
Myasuka commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint URL: https://github.com/apache/flink/pull/10008#issuecomment-546718936 CC @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-14403) Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
[ https://issues.apache.org/jira/browse/FLINK-14403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-14403: --- Labels: pull-request-available (was: ) > Remove uesless NotifyCheckpointComplete and TriggerCheckpoint > - > > Key: FLINK-14403 > URL: https://issues.apache.org/jira/browse/FLINK-14403 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > > After [FLINK-12322|https://issues.apache.org/jira/browse/FLINK-12322] fixed, > we have removed legacy {{ActorTaskManagerGateway}} and the usage of > {{NotifyCheckpointComplete}} and {{TriggerCheckpoint}} have been disabled. > However, these classes still exist currently, we should also remove them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] Myasuka opened a new pull request #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
Myasuka opened a new pull request #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint URL: https://github.com/apache/flink/pull/10008 ## What is the purpose of the change After FLINK-12322 fixed, we have removed legacy ActorTaskManagerGateway and the usage of NotifyCheckpointComplete and TriggerCheckpoint have been disabled. However, these classes still exist currently, we should also remove them. ## Brief change log - Remove code of `NotifyCheckpointComplete`, `TriggerCheckpoint` and its usage in test `CheckpointMessagesTest`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-14482) Bump up rocksdb version to support WriteBufferManager
[ https://issues.apache.org/jira/browse/FLINK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960649#comment-16960649 ] Yun Tang commented on FLINK-14482: -- [~trohrmann] Yes, we first plan to upgrade the base RocksDB version of FRocksDB but found there existed some performance regression. Until now, I think we might still have to remain previous base RocksDB version and cherry-pick write-buffer manager feature code in Flink-1.10 >From my point of view, I think this issue is target for Flink-1.11 for the >future and still need to maintain it as {{open}}. > Bump up rocksdb version to support WriteBufferManager > - > > Key: FLINK-14482 > URL: https://issues.apache.org/jira/browse/FLINK-14482 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.11.0 > > > Current rocksDB-5.17.2 does not support write buffer manager well, we need to > bump rocksdb version to support that feature. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution. URL: https://github.com/apache/flink/pull/9733#discussion_r339354617 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.statistics.basicstatistic; + +import org.apache.flink.ml.common.linalg.BLAS; +import org.apache.flink.ml.common.linalg.DenseMatrix; +import org.apache.flink.ml.common.linalg.DenseVector; +import org.apache.flink.ml.common.linalg.SparseVector; +import org.apache.flink.ml.common.linalg.Vector; + +import com.github.fommil.netlib.LAPACK; +import org.netlib.util.intW; + +/** + * This class provides basic functionality for a Multivariate Gaussian (Normal) Distribution. + */ +public class MultivariateGaussian { + + private static final LAPACK LAPACK_INST = LAPACK.getInstance(); + private static final double EPSILON; + + static { + double eps = 1.0; + while ((1.0 + (eps / 2.0)) != 1.0) { + eps /= 2.0; + } + EPSILON = eps; + } + + private final DenseVector mean; + private final DenseMatrix cov; + + private DenseMatrix rootSigmaInv; + private double u; + + // data buffers for computing pdf + private DenseVector delta; + private DenseVector v; + + /** +* The constructor. +* +* @param mean The mean vector of the distribution. +* @param cov The covariance matrix of the distribution. +*/ + public MultivariateGaussian(DenseVector mean, DenseMatrix cov) { + this.mean = mean; + this.cov = cov; + this.delta = DenseVector.zeros(mean.size()); + this.v = DenseVector.zeros(mean.size()); + calculateCovarianceConstants(); + } + + /** +* Returns density of this multivariate Gaussian at given point, x. +*/ + public double pdf(Vector x) { + return Math.exp(logpdf(x)); + } + + /** +* Returns the log-density of this multivariate Gaussian at given point, x. +*/ + public double logpdf(Vector x) { + int n = mean.size(); + System.arraycopy(mean.getData(), 0, delta.getData(), 0, n); + BLAS.scal(-1.0, delta); + if (x instanceof DenseVector) { + BLAS.axpy(1., (DenseVector) x, delta); + } else if (x instanceof SparseVector) { + BLAS.axpy(1., (SparseVector) x, delta); + } + BLAS.gemv(1.0, rootSigmaInv, false, delta, 0., v); + return u - 0.5 * BLAS.dot(v, v); + } + + /** +* Compute distribution dependent constants. +* +* The probability density function is calculated as: +* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu)) +* +* Here we compute the following distribution dependent constants that can be reused in each pdf computation: +* A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^) +* B) rootSigmaInv = sqrt(inv(sigma)) = D^(-1/2)^ * U.t +* +* +* sigma = U * D * U.t +* inv(sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * (D^{-1/2}^ * U.t) +* sqrt(inv(sigma)) = D^(-1/2)^ * U.t +* +*/ + private void calculateCovarianceConstants() { + int n = this.mean.size(); + int lwork = 3 * n - 1; + double[] matA = new double[n * n]; + double[] work = new double[lwork]; + double[] evs = new double[n]; + intW info = new intW(0); + + for (int i = 0; i < n; i++) { + System.arraycopy(cov.getData(), i * n, matA, i * n, i + 1); + } + LAPACK_INST.dsyev("V", "U", n, matA, n, evs, work, lwork, info); + +
[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution. URL: https://github.com/apache/flink/pull/9733#discussion_r339354266 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.statistics.basicstatistic; + +import org.apache.flink.ml.common.linalg.BLAS; +import org.apache.flink.ml.common.linalg.DenseMatrix; +import org.apache.flink.ml.common.linalg.DenseVector; +import org.apache.flink.ml.common.linalg.SparseVector; +import org.apache.flink.ml.common.linalg.Vector; + +import com.github.fommil.netlib.LAPACK; +import org.netlib.util.intW; + +/** + * This class provides basic functionality for a Multivariate Gaussian (Normal) Distribution. + */ +public class MultivariateGaussian { + + private static final LAPACK LAPACK_INST = LAPACK.getInstance(); + private static final double EPSILON; + + static { + double eps = 1.0; + while ((1.0 + (eps / 2.0)) != 1.0) { + eps /= 2.0; + } + EPSILON = eps; + } + + private final DenseVector mean; + private final DenseMatrix cov; + + private DenseMatrix rootSigmaInv; + private double u; Review comment: members can be `final`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution. URL: https://github.com/apache/flink/pull/9733#discussion_r339354298 ## File path: flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussianTest.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.statistics.basicstatistic; + +import org.apache.flink.ml.common.linalg.DenseMatrix; +import org.apache.flink.ml.common.linalg.DenseVector; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for MultivariateGaussian. + */ +public class MultivariateGaussianTest { + private static final double TOL = 1.0e-5; + Review comment: add test for `Univariate` as well just in case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution. URL: https://github.com/apache/flink/pull/9733#discussion_r339354650 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.statistics.basicstatistic; + +import org.apache.flink.ml.common.linalg.BLAS; +import org.apache.flink.ml.common.linalg.DenseMatrix; +import org.apache.flink.ml.common.linalg.DenseVector; +import org.apache.flink.ml.common.linalg.SparseVector; +import org.apache.flink.ml.common.linalg.Vector; + +import com.github.fommil.netlib.LAPACK; +import org.netlib.util.intW; + +/** + * This class provides basic functionality for a Multivariate Gaussian (Normal) Distribution. + */ +public class MultivariateGaussian { + + private static final LAPACK LAPACK_INST = LAPACK.getInstance(); + private static final double EPSILON; + + static { + double eps = 1.0; + while ((1.0 + (eps / 2.0)) != 1.0) { + eps /= 2.0; + } + EPSILON = eps; + } + + private final DenseVector mean; + private final DenseMatrix cov; + + private DenseMatrix rootSigmaInv; + private double u; + + // data buffers for computing pdf + private DenseVector delta; + private DenseVector v; + + /** +* The constructor. +* +* @param mean The mean vector of the distribution. +* @param cov The covariance matrix of the distribution. +*/ + public MultivariateGaussian(DenseVector mean, DenseMatrix cov) { + this.mean = mean; + this.cov = cov; + this.delta = DenseVector.zeros(mean.size()); + this.v = DenseVector.zeros(mean.size()); + calculateCovarianceConstants(); + } + + /** +* Returns density of this multivariate Gaussian at given point, x. +*/ + public double pdf(Vector x) { + return Math.exp(logpdf(x)); + } + + /** +* Returns the log-density of this multivariate Gaussian at given point, x. +*/ + public double logpdf(Vector x) { + int n = mean.size(); + System.arraycopy(mean.getData(), 0, delta.getData(), 0, n); + BLAS.scal(-1.0, delta); + if (x instanceof DenseVector) { + BLAS.axpy(1., (DenseVector) x, delta); + } else if (x instanceof SparseVector) { + BLAS.axpy(1., (SparseVector) x, delta); + } + BLAS.gemv(1.0, rootSigmaInv, false, delta, 0., v); + return u - 0.5 * BLAS.dot(v, v); + } + + /** +* Compute distribution dependent constants. +* +* The probability density function is calculated as: +* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu)) +* +* Here we compute the following distribution dependent constants that can be reused in each pdf computation: +* A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^) +* B) rootSigmaInv = sqrt(inv(sigma)) = D^(-1/2)^ * U.t +* +* +* sigma = U * D * U.t +* inv(sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * (D^{-1/2}^ * U.t) +* sqrt(inv(sigma)) = D^(-1/2)^ * U.t +* +*/ + private void calculateCovarianceConstants() { + int n = this.mean.size(); + int lwork = 3 * n - 1; + double[] matA = new double[n * n]; + double[] work = new double[lwork]; + double[] evs = new double[n]; + intW info = new intW(0); + + for (int i = 0; i < n; i++) { + System.arraycopy(cov.getData(), i * n, matA, i * n, i + 1); + } + LAPACK_INST.dsyev("V", "U", n, matA, n, evs, work, lwork, info); + +
[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution. URL: https://github.com/apache/flink/pull/9733#discussion_r339354278 ## File path: flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.common.statistics.basicstatistic; + +import org.apache.flink.ml.common.linalg.BLAS; +import org.apache.flink.ml.common.linalg.DenseMatrix; +import org.apache.flink.ml.common.linalg.DenseVector; +import org.apache.flink.ml.common.linalg.SparseVector; +import org.apache.flink.ml.common.linalg.Vector; + +import com.github.fommil.netlib.LAPACK; +import org.netlib.util.intW; + +/** + * This class provides basic functionality for a Multivariate Gaussian (Normal) Distribution. + */ +public class MultivariateGaussian { + + private static final LAPACK LAPACK_INST = LAPACK.getInstance(); + private static final double EPSILON; + + static { + double eps = 1.0; + while ((1.0 + (eps / 2.0)) != 1.0) { + eps /= 2.0; + } + EPSILON = eps; + } + + private final DenseVector mean; + private final DenseMatrix cov; + + private DenseMatrix rootSigmaInv; + private double u; + + // data buffers for computing pdf + private DenseVector delta; + private DenseVector v; + + /** +* The constructor. +* +* @param mean The mean vector of the distribution. +* @param cov The covariance matrix of the distribution. +*/ + public MultivariateGaussian(DenseVector mean, DenseMatrix cov) { + this.mean = mean; + this.cov = cov; + this.delta = DenseVector.zeros(mean.size()); + this.v = DenseVector.zeros(mean.size()); + calculateCovarianceConstants(); + } + + /** +* Returns density of this multivariate Gaussian at given point, x. +*/ + public double pdf(Vector x) { + return Math.exp(logpdf(x)); + } + + /** +* Returns the log-density of this multivariate Gaussian at given point, x. +*/ + public double logpdf(Vector x) { + int n = mean.size(); + System.arraycopy(mean.getData(), 0, delta.getData(), 0, n); + BLAS.scal(-1.0, delta); + if (x instanceof DenseVector) { + BLAS.axpy(1., (DenseVector) x, delta); + } else if (x instanceof SparseVector) { + BLAS.axpy(1., (SparseVector) x, delta); + } + BLAS.gemv(1.0, rootSigmaInv, false, delta, 0., v); + return u - 0.5 * BLAS.dot(v, v); + } + + /** +* Compute distribution dependent constants. +* +* The probability density function is calculated as: +* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu)) +* +* Here we compute the following distribution dependent constants that can be reused in each pdf computation: +* A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^) +* B) rootSigmaInv = sqrt(inv(sigma)) = D^(-1/2)^ * U.t +* +* +* sigma = U * D * U.t +* inv(sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * (D^{-1/2}^ * U.t) +* sqrt(inv(sigma)) = D^(-1/2)^ * U.t +* +*/ + private void calculateCovarianceConstants() { + int n = this.mean.size(); + int lwork = 3 * n - 1; + double[] matA = new double[n * n]; + double[] work = new double[lwork]; + double[] evs = new double[n]; + intW info = new intW(0); + + for (int i = 0; i < n; i++) { + System.arraycopy(cov.getData(), i * n, matA, i * n, i + 1); + } Review comment: `System.arraycopy(cov.getData(), 0, matA, 0, n*n);`?