[GitHub] [flink] flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client 
sporadically hangs after Ctrl + C
URL: https://github.com/apache/flink/pull/10010#issuecomment-546787757
 
 
   
   ## CI report:
   
   * 4b0f9efdc169d3b6375d469cd3512e87e98f0f19 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133758455)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339408894
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
+
+   final List exclusiveSegments = 
globalPool.requestMemorySegments();
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(0, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   final AtomicBoolean completeFlag = new 
AtomicBoolean(false);
+   CompletableFuture availableFuture = 
globalPool.isAvailable();
+   availableFuture.whenComplete((ignored, throwable) -> 
completeFlag.set(true));
+
+   // recycle one buffer
+   globalPool.recycle(segments.get(0));
+   assertTrue(completeFlag.get());
+   assertTrue(availableFuture.isDone());
+   assertTrue(globalPool.isAvailable().isDone());
+   assertEquals(1, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   CheckedThread asyncRequest = new CheckedThread() {
+   @Override
+   public void go() throws Exception {
+   
exclusiveSegments.addAll(globalPool.requestMemorySegments());
+   }
+   };
+   asyncRequest.start();
+
+   // wait until no buffer is available
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(10L));
+   while (globalPool.getNumberOfAvailableMemorySegments() 
> 0) {
+   Thread.sleep(50);
+   if (!deadline.hasTimeLeft()) {
+   fail("Waiting timeout.");
+   }
+   }
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   for (int i = 1; i < numberOfSegmentsToRequest; ++i) {
+   globalPool.recycle(segments.get(i));
+   }
+   segments.clear();
+
+   asyncRequest.sync();
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(numBuffers, exclusiveSegments.size());
+   assertFalse(globalPool.isAvailable().isDone());
+
+   globalPool.recycleMemorySegments(exclusiveSegments);
+   exclusiveSegments.clear();
+   assertTrue(globalPool.isAvailable().isDone());
+   assertEquals(numBuffers, 
globalPool.getNumberOfAvailableMemorySegments());
 
 Review comment:
   ```
   globalPool.recycleMemorySegments(exclusiveSegments);
   exclusiveSegments.clear();
   assertTrue(globalPool.isAvailable().isDone());
   assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
   ```
   This part is only for resource recycle at last, so we can remove the last 

[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339407704
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
 
 Review comment:
   Related PR: https://github.com/apache/flink/pull/8468
   It can be done something in `TableSource.explainSource`, it is default 
method and can have some logical.
   But I think this check could exist.


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339408191
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
+
+   final List exclusiveSegments = 
globalPool.requestMemorySegments();
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(0, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   final AtomicBoolean completeFlag = new 
AtomicBoolean(false);
+   CompletableFuture availableFuture = 
globalPool.isAvailable();
+   availableFuture.whenComplete((ignored, throwable) -> 
completeFlag.set(true));
+
+   // recycle one buffer
+   globalPool.recycle(segments.get(0));
+   assertTrue(completeFlag.get());
+   assertTrue(availableFuture.isDone());
+   assertTrue(globalPool.isAvailable().isDone());
+   assertEquals(1, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   CheckedThread asyncRequest = new CheckedThread() {
+   @Override
+   public void go() throws Exception {
+   
exclusiveSegments.addAll(globalPool.requestMemorySegments());
+   }
+   };
+   asyncRequest.start();
+
+   // wait until no buffer is available
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(10L));
+   while (globalPool.getNumberOfAvailableMemorySegments() 
> 0) {
+   Thread.sleep(50);
+   if (!deadline.hasTimeLeft()) {
+   fail("Waiting timeout.");
+   }
+   }
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   for (int i = 1; i < numberOfSegmentsToRequest; ++i) {
+   globalPool.recycle(segments.get(i));
+   }
+   segments.clear();
+
+   asyncRequest.sync();
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(numBuffers, exclusiveSegments.size());
 
 Review comment:
   It is not very strict to use `numBuffers` here. Actually it should be `2 * 
numberOfSegmentsToRequest` and it is coincidental that `numBuffers = 2 *  
numberOfSegmentsToRequest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339408562
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
 
 Review comment:
   It can be, I think it is the third reason to keep the origin limit relNode.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool

2019-10-27 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960766#comment-16960766
 ] 

Jiayi Liao commented on FLINK-14498:


[~kevin.cyj] Thanks for you explaination :).

> Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
> --
>
> Key: FLINK-14498
> URL: https://issues.apache.org/jira/browse/FLINK-14498
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If the LocalBufferPool can not request available buffer from 
> NetworkBufferPool, it would wait for 2 seconds before trying to request again 
> in a loop way. Therefore it would bring some delays in practice.
> To improve this interaction, we could introduce NetworkBufferPool#isAvailable 
> to return a future which would be monitored by LocalBufferPool. Then once 
> there are available buffers in NetworkBufferPool, it would complete this 
> future to notify LocalBufferPool immediately. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool

2019-10-27 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960764#comment-16960764
 ] 

Yingjie Cao commented on FLINK-14498:
-

[~wind_ljy] The PR is based on another PR 
[https://github.com/apache/flink/pull/9905] which has not been merged yet. Only 
the last commit is relevant to the this jira. I will do rebase once the 
dependent PR is merged.

> Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
> --
>
> Key: FLINK-14498
> URL: https://issues.apache.org/jira/browse/FLINK-14498
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If the LocalBufferPool can not request available buffer from 
> NetworkBufferPool, it would wait for 2 seconds before trying to request again 
> in a loop way. Therefore it would bring some delays in practice.
> To improve this interaction, we could introduce NetworkBufferPool#isAvailable 
> to return a future which would be monitored by LocalBufferPool. Then once 
> there are available buffers in NetworkBufferPool, it would complete this 
> future to notify LocalBufferPool immediately. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW merged pull request #9905: [FLINK-14396][network] Implement rudimentary non-blocking network output

2019-10-27 Thread GitBox
zhijiangW merged pull request #9905: [FLINK-14396][network] Implement 
rudimentary non-blocking network output
URL: https://github.com/apache/flink/pull/9905
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339407219
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * The table source which support push-down the limit to the source.
+  */
+class TestLimitableTableSource(
+data: Seq[Row],
+rowType: RowTypeInfo,
+var limit: Long = -1,
+var limitablePushedDown: Boolean = false)
+  extends StreamTableSource[Row]
+  with LimitableTableSource[Row] {
+
+  override def isBounded = true
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+val dataSet = if (limit > 0) {
+  data.take(limit.toInt).asJava
+} else {
+  data.asJava
+}
+execEnv.createInput(
+  new CollectionInputFormat(dataSet, rowType.createSerializer(new 
ExecutionConfig)),
+  rowType)
+  }
+
+  override def applyLimit(limit: Long): TableSource[Row] = {
+new TestLimitableTableSource(data, rowType, limit, limitablePushedDown)
+  }
+
+  override def isLimitPushedDown: Boolean = limitablePushedDown
+
+  override def getReturnType: TypeInformation[Row] = rowType
+
+  override def explainSource(): String = {
+if (limit > 0 && limit < Long.MaxValue) {
+  "limit: " + limit
+} else if (limitablePushedDown) {
+  "limitablePushedDown"
 
 Review comment:
   Yes, I will add:
   ```
   if (limit == 0) {
 throw new RuntimeException("limit 0 should be optimize to single 
values.")
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339407217
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   OK, that meakes sense. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339407007
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
+
+   final List exclusiveSegments = 
globalPool.requestMemorySegments();
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(0, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   final AtomicBoolean completeFlag = new 
AtomicBoolean(false);
+   CompletableFuture availableFuture = 
globalPool.isAvailable();
+   availableFuture.whenComplete((ignored, throwable) -> 
completeFlag.set(true));
+
+   // recycle one buffer
+   globalPool.recycle(segments.get(0));
+   assertTrue(completeFlag.get());
+   assertTrue(availableFuture.isDone());
+   assertTrue(globalPool.isAvailable().isDone());
+   assertEquals(1, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   CheckedThread asyncRequest = new CheckedThread() {
+   @Override
+   public void go() throws Exception {
+   
exclusiveSegments.addAll(globalPool.requestMemorySegments());
+   }
+   };
+   asyncRequest.start();
+
+   // wait until no buffer is available
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(10L));
+   while (globalPool.getNumberOfAvailableMemorySegments() 
> 0) {
+   Thread.sleep(50);
+   if (!deadline.hasTimeLeft()) {
+   fail("Waiting timeout.");
+   }
+   }
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   for (int i = 1; i < numberOfSegmentsToRequest; ++i) {
+   globalPool.recycle(segments.get(i));
+   }
+   segments.clear();
+
+   asyncRequest.sync();
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(numBuffers, exclusiveSegments.size());
+   assertFalse(globalPool.isAvailable().isDone());
 
 Review comment:
   Duplicate with 601 line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339406809
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   maybe, if the total records are less than the limit, it could be.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool

2019-10-27 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960762#comment-16960762
 ] 

Jiayi Liao edited comment on FLINK-14498 at 10/28/19 5:27 AM:
--

[~zjwang][~kevin.cyj]  Maybe I'm not very familiar the whole story, I have a 
question after review the PR. According to the JIRA description and what I've 
understand, the problem happens when {{LocalBufferPool}} requests bufffer from 
{{NetworkBufferPool}}, which means the scope should be narrowed down to 
{{LocalBufferPool}} and {{NetworkBufferPool}}. But why do you extend the 
{{isAvailable}} to {{RecordWriter}} related class? Maybe this is a stupid 
question but I'm a bit confused.


was (Author: wind_ljy):
[~zjwang][~kevin.cyj]  Maybe I'm not very familiar the whole story, I have a 
question after review the PR. According to the JIRA description and what I've 
understand, the problem happens when {{LocalBufferPool}} requests bufffer from 
{{NetworkBufferPool}}, which means the scope should be narrowed down to 
LocalBufferPool and NetworkBufferPool. But why do you extend the 
{{isAvailable}} to {{RecordWriter}} related class? Maybe this is a stupid 
question but I'm a bit confused.

> Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
> --
>
> Key: FLINK-14498
> URL: https://issues.apache.org/jira/browse/FLINK-14498
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If the LocalBufferPool can not request available buffer from 
> NetworkBufferPool, it would wait for 2 seconds before trying to request again 
> in a loop way. Therefore it would bring some delays in practice.
> To improve this interaction, we could introduce NetworkBufferPool#isAvailable 
> to return a future which would be monitored by LocalBufferPool. Then once 
> there are available buffers in NetworkBufferPool, it would complete this 
> future to notify LocalBufferPool immediately. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool

2019-10-27 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960762#comment-16960762
 ] 

Jiayi Liao commented on FLINK-14498:


[~zjwang][~kevin.cyj]  Maybe I'm not very familiar the whole story, I have a 
question after review the PR. According to the JIRA description and what I've 
understand, the problem happens when {{LocalBufferPool}} requests bufffer from 
{{NetworkBufferPool}}, which means the scope should be narrowed down to 
LocalBufferPool and NetworkBufferPool. But why do you extend the 
{{isAvailable}} to {{RecordWriter}} related class? Maybe this is a stupid 
question but I'm a bit confused.

> Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
> --
>
> Key: FLINK-14498
> URL: https://issues.apache.org/jira/browse/FLINK-14498
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: Yingjie Cao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If the LocalBufferPool can not request available buffer from 
> NetworkBufferPool, it would wait for 2 seconds before trying to request again 
> in a loop way. Therefore it would bring some delays in practice.
> To improve this interaction, we could introduce NetworkBufferPool#isAvailable 
> to return a future which would be monitored by LocalBufferPool. Then once 
> there are available buffers in NetworkBufferPool, it would complete this 
> future to notify LocalBufferPool immediately. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339406297
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
+  + "table source with pushdown capability must override and change "
+  + "explainSource() API to explain the pushdown applied!")
+}
+
+call.transformTo(sort.copy(sort.getTraitSet, 

[jira] [Commented] (FLINK-14026) Manage the resource of Python worker properly

2019-10-27 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960761#comment-16960761
 ] 

Dian Fu commented on FLINK-14026:
-

[~sunjincheng121] Thanks a lot for the remind. I have drafted a design doc 
which is integrated with 
[FLIP-53|https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management]
 in which it proposes a fine grained operator resource management. I will 
submit it to the DEV mailing list for discussion later on.

> Manage the resource of Python worker properly
> -
>
> Key: FLINK-14026
> URL: https://issues.apache.org/jira/browse/FLINK-14026
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.10.0
>
>
> For a Flink Table API & SQL job, if it uses Python user-defined functions, 
> the Java operator will launch separate Python process for Python user-defined 
> function execution. We should make sure that the resources used by the Python 
> process are managed by Flink’s resource management framework.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339406308
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   Btw, `less than or equal to the limit`, the source shouldn't generate 
records less than the limit, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339405733
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   I think this word just tell user: there is no limit, you just do what you 
can do.
   The comments in `PushLimitIntoTableSourceScanRule` is difficult to 
understand without code. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339405733
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   I think this word just tell user: there is no limit, you just do what you 
can do.
   The comments in `PushLimitIntoTableSourceScanRule` is difficult to 
understand without code, I think we can just add a link to 
`PushLimitIntoTableSourceScanRule`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1

2019-10-27 Thread Shuwen Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960759#comment-16960759
 ] 

Shuwen Zhou commented on FLINK-14538:
-

[~wind_ljy]

It was deployed on Yarn cluster.

The command of job manager is following:
{code:java}
/usr/lib/jvm/java-1.8.0-openjdk/bin/java -Xms2304m -Xmx2304m -XX:+PrintHeapAtGC 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
-Dlog.file=/ebs_hadoop_data/yarn-data/log/application_156508035_24/container_e20_156508035_24_01_01/jobmanager.log
 -Dlog4j.configuration=file:log4j.properties 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
{code}
The command of task manager is :
{code:java}
/usr/lib/jvm/java-1.8.0-openjdk/bin/java -Xms9830m -Xmx9830m 
-XX:MaxDirectMemorySize=6554m -XX:+PrintHeapAtGC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
-Dlog.file=/ebs_hadoop_data/yarn-data/log/application_156508035_24/container_e20_156508035_24_01_000163/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
{code}
Thanks.

I was expecting NonHeap memory will be logged as default value.

 

> Metrics of Status.JVM.Memory.NonHeap.Max is always -1
> -
>
> Key: FLINK-14538
> URL: https://issues.apache.org/jira/browse/FLINK-14538
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Shuwen Zhou
>Priority: Minor
> Attachments: image-2019-10-28-12-34-53-413.png
>
>
> I'm collecting JobManager and TaskManager status metrics to DataDog.
> While all metrics of both JobManager and TaskManager 
> Status.JVM.Memory.NonHeap.Max is always -1
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the 
endInput call of the downstream operator to after the upstream operator closes
URL: https://github.com/apache/flink/pull/9854#issuecomment-539497852
 
 
   
   ## CI report:
   
   * 9adcb150ed88bfa077642f6f590ec4ae9183baf5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130951546)
   * 7beda6a8760882ae464701d2f9faf5ba4bb9be3a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133562068)
   * d035936297745e9e7d6189680f0cb2b3c8aec245 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133758460)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1

2019-10-27 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960754#comment-16960754
 ] 

Jiayi Liao commented on FLINK-14538:


[~jacobc3] This usually means the nonHeap memory is not defined. 

1. Does this deploy on Yarn or Flink standalone cluster?
2. Could you show us the java command that starts jobManager / taskManager 
(something like {{/path/to/java -Xmx...}}).

> Metrics of Status.JVM.Memory.NonHeap.Max is always -1
> -
>
> Key: FLINK-14538
> URL: https://issues.apache.org/jira/browse/FLINK-14538
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Shuwen Zhou
>Priority: Minor
> Attachments: image-2019-10-28-12-34-53-413.png
>
>
> I'm collecting JobManager and TaskManager status metrics to DataDog.
> While all metrics of both JobManager and TaskManager 
> Status.JVM.Memory.NonHeap.Max is always -1
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10010: [FLINK-10435][yarn]Client 
sporadically hangs after Ctrl + C
URL: https://github.com/apache/flink/pull/10010#issuecomment-546787757
 
 
   
   ## CI report:
   
   * 4b0f9efdc169d3b6375d469cd3512e87e98f0f19 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133758455)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot 
sharing groups according to logical pipelined regions
URL: https://github.com/apache/flink/pull/10007#issuecomment-546673889
 
 
   
   ## CI report:
   
   * 2f64299fb74a59f77f14f20bed87191389f58317 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133706395)
   * 5b16730a6403a9cffdc564924be89121561f2e82 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133755954)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9854: [FLINK-14230][task] Change the 
endInput call of the downstream operator to after the upstream operator closes
URL: https://github.com/apache/flink/pull/9854#issuecomment-539497852
 
 
   
   ## CI report:
   
   * 9adcb150ed88bfa077642f6f590ec4ae9183baf5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/130951546)
   * 7beda6a8760882ae464701d2f9faf5ba4bb9be3a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133562068)
   * d035936297745e9e7d6189680f0cb2b3c8aec245 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI

2019-10-27 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960752#comment-16960752
 ] 

Congxian Qiu(klion26) commented on FLINK-14264:
---

Kindly ping [~aljoscha].

If this is valid, I just want to add a property for {{stateBackend}} in 
{{CheckpointCoordinatorConfiguration}}, and pass this value when init 
{{CheckpointCoordinatorConfiguration}} in 
{{StreamJobGraphGenerator}}#configureCheckpointing. The backend value will get 
from {{StreamGraph}}. what do you think about this?

> Expose CheckpointBackend in checkpoint config RestAPI
> -
>
> Key: FLINK-14264
> URL: https://issues.apache.org/jira/browse/FLINK-14264
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, we can get checkpoint config from rest api[1], the response 
> contains the information as below
>  * timeout
>  * min_pause
>  * max_concurrent
>  * externalization
> But did not contain the type of CheckpointBackend, but in some scenarios, we 
> want to get the CheckpointBackend type from Rest, this issue wants to add the 
> simple name of the CheckpointBackend in the {{checkpoints/config rest with 
> key }}{{checkpoint_backend, so the response will contain the information such 
> as below}}
>  * {{timeout}}
>  * {{min_pause}}
>  * {{max_concurrent}}
>  * checkpoint_backend 
>  * externalization
>  
>  [1] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1

2019-10-27 Thread Shuwen Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuwen Zhou updated FLINK-14538:

Description: 
I'm collecting JobManager and TaskManager status metrics to DataDog.

While all metrics of both JobManager and TaskManager 
Status.JVM.Memory.NonHeap.Max is always -1

 

  was:
I'm collecting TaskManager status metrics to DataDog.

While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1

 


> Metrics of Status.JVM.Memory.NonHeap.Max is always -1
> -
>
> Key: FLINK-14538
> URL: https://issues.apache.org/jira/browse/FLINK-14538
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Shuwen Zhou
>Priority: Minor
> Attachments: image-2019-10-28-12-34-53-413.png
>
>
> I'm collecting JobManager and TaskManager status metrics to DataDog.
> While all metrics of both JobManager and TaskManager 
> Status.JVM.Memory.NonHeap.Max is always -1
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14538) Metrics of Status.JVM.Memory.NonHeap.Max is always -1

2019-10-27 Thread Shuwen Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuwen Zhou updated FLINK-14538:

Summary: Metrics of Status.JVM.Memory.NonHeap.Max is always -1  (was: 
Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1)

> Metrics of Status.JVM.Memory.NonHeap.Max is always -1
> -
>
> Key: FLINK-14538
> URL: https://issues.apache.org/jira/browse/FLINK-14538
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Shuwen Zhou
>Priority: Minor
> Attachments: image-2019-10-28-12-34-53-413.png
>
>
> I'm collecting TaskManager status metrics to DataDog.
> While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C

2019-10-27 Thread GitBox
flinkbot commented on issue #10010: [FLINK-10435][yarn]Client sporadically 
hangs after Ctrl + C
URL: https://github.com/apache/flink/pull/10010#issuecomment-546787757
 
 
   
   ## CI report:
   
   * 4b0f9efdc169d3b6375d469cd3512e87e98f0f19 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14538) Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1

2019-10-27 Thread Shuwen Zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuwen Zhou updated FLINK-14538:

Description: 
I'm collecting TaskManager status metrics to DataDog.

While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1

 

  was:
I'm collecting TaskManager status metrics to DataDog.

While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1

!image-2019-10-28-12-34-53-413.png!


> Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1
> -
>
> Key: FLINK-14538
> URL: https://issues.apache.org/jira/browse/FLINK-14538
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Shuwen Zhou
>Priority: Minor
> Attachments: image-2019-10-28-12-34-53-413.png
>
>
> I'm collecting TaskManager status metrics to DataDog.
> While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14538) Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1

2019-10-27 Thread Shuwen Zhou (Jira)
Shuwen Zhou created FLINK-14538:
---

 Summary: Metrics of TaskManager Status.JVM.Memory.NonHeap.Max is 
always -1
 Key: FLINK-14538
 URL: https://issues.apache.org/jira/browse/FLINK-14538
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.9.0
Reporter: Shuwen Zhou
 Attachments: image-2019-10-28-12-34-53-413.png

I'm collecting TaskManager status metrics to DataDog.

While all metrics of TaskManager Status.JVM.Memory.NonHeap.Max is always -1

!image-2019-10-28-12-34-53-413.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] translate dev/stream/state/checkpointing into Chinese

2019-10-27 Thread GitBox
Henvealf commented on a change in pull request #9805: [FLINK-11635][docs-zh] 
translate dev/stream/state/checkpointing into Chinese
URL: https://github.com/apache/flink/pull/9805#discussion_r339399783
 
 

 ##
 File path: docs/dev/stream/state/checkpointing.zh.md
 ##
 @@ -173,30 +165,26 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
 
 ## Selecting a State Backend
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339398802
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
+
+   final List exclusiveSegments = 
globalPool.requestMemorySegments();
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(0, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   final AtomicBoolean completeFlag = new 
AtomicBoolean(false);
+   CompletableFuture availableFuture = 
globalPool.isAvailable();
+   availableFuture.whenComplete((ignored, throwable) -> 
completeFlag.set(true));
+
+   // recycle one buffer
+   globalPool.recycle(segments.get(0));
+   assertTrue(completeFlag.get());
+   assertTrue(availableFuture.isDone());
+   assertTrue(globalPool.isAvailable().isDone());
+   assertEquals(1, 
globalPool.getNumberOfAvailableMemorySegments());
 
 Review comment:
   ditto: we could extract a separate test for verifying the 
`globalPool.requestMemorySegments()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb edited a comment on issue #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes

2019-10-27 Thread GitBox
sunhaibotb edited a comment on issue #9854: [FLINK-14230][task] Change the 
endInput call of the downstream operator to after the upstream operator closes
URL: https://github.com/apache/flink/pull/9854#issuecomment-546785241
 
 
   I've addressed the lastest comments and added some detailed descriptions to 
the commit messages. Thanks @zhijiangW.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339398713
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
 
 Review comment:
   It is better to extract a separate test for only verifying the logic of 
`globalPool.requestMemorySegment()`. E.g. before requesting the single segment 
the global pool is available, and after requesting it becomes unavailable.
   
   This test mixes many functions to be  verified together, then it is 
difficult to trace the logic and maintain the code future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10435) Client sporadically hangs after Ctrl + C

2019-10-27 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960738#comment-16960738
 ] 

Yang Wang edited comment on FLINK-10435 at 10/28/19 4:14 AM:
-

I have tested multiple times and found the root cause. It just because the yarn 
client is closed by `YarnClusterDescriptor#close`. However, it is still in use 
by `DeploymentFailureHook#failSessionDuringDeployment`. So it encounter an 
exception in `yarnClient.killApplication` and will retry based on the 
policy(default is 30 times). The retry process in shutdown hook may take long 
time.

 

Since flink client main thread(CliFrontend->YarnClusterDescriptor) and shut 
hook thread both need the yarn client, and we can not guarantee the order. I 
suggest to remove the `yarnClient.close()` in `failSessionDuringDeployment` and 
use a new yarn client in shut down hook.

[~trohrmann], [~tison] i think this is bug. How do you think?


was (Author: fly_in_gis):
I have tested multiple times and found the root cause. It just because the yarn 
client is closed by `YarnClusterDescriptor#close`. However, it is still in use 
by `DeploymentFailureHook#failSessionDuringDeployment`. So it encounter an 
exception in `yarnClient.killApplication` and will retry based on the 
policy(default is 30 times). The retry process in shutdown hook may take long 
time.

 

Since flink client main thread(CliFrontend->YarnClusterDescriptor) and shut 
hook thread both need the yarn client, and we can not guarantee the order. I 
suggest to remove the `yarnClient.close()` in `failSessionDuringDeployment` and 
use a new yarn client in shut down hook.

[~trohrmann], i think this is bug. How do you think?

> Client sporadically hangs after Ctrl + C
> 
>
> Key: FLINK-10435
> URL: https://issues.apache.org/jira/browse/FLINK-10435
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client, Deployment / YARN
>Affects Versions: 1.5.5, 1.6.2, 1.7.0, 1.9.1
>Reporter: Gary Yao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When submitting a YARN job cluster in attached mode, the client hangs 
> indefinitely if Ctrl + C is pressed at the right time. One can recover from 
> this by sending SIGKILL.
> *Command to submit job*
> {code}
> HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> examples/streaming/WordCount.jar
> {code}
>  
> *Output/Stacktrace*
> {code}
> [hadoop@ip-172-31-45-22 flink-1.5.4]$ HADOOP_CLASSPATH=`hadoop classpath` 
> bin/flink run -m yarn-cluster examples/streaming/WordCount.jar
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop/flink-1.5.4/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-26 12:01:04,241 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> ip-172-31-45-22.eu-central-1.compute.internal/172.31.45.22:8032
> 2018-09-26 12:01:04,386 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-26 12:01:04,386 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-26 12:01:04,402 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-09-26 12:01:04,598 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=1024, 
> taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-26 12:01:04,972 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop/flink-1.5.4/conf') contains both LOG4J 
> and Logback configuration files. Please delete or rename one of them.
> 2018-09-26 12:01:07,857 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting 
> application master application_1537944258063_0017
> 2018-09-26 12:01:07,913 

[GitHub] [flink] wangyang0918 opened a new pull request #10010: [FLINK-10435][yarn]Client sporadically hangs after Ctrl + C

2019-10-27 Thread GitBox
wangyang0918 opened a new pull request #10010: [FLINK-10435][yarn]Client 
sporadically hangs after Ctrl + C
URL: https://github.com/apache/flink/pull/10010
 
 
   
   
   ## What is the purpose of the change
   
   This PR is to fix the bug when deploying on yarn in attach mode that flink 
client hangs after `Ctrl + C`. More information could be found at 
[FLINK-10435](https://issues.apache.org/jira/browse/FLINK-10435).
   
   
   ## Brief change log
   
   * remove the `yarnClient.close()` in `failSessionDuringDeployment` and use a 
new yarn client in `DeploymentFailureHook`
   
   
   ## Verifying this change
   
   This change can be verified as follows:
   
   * Manually run a command to start flink cluster on yarn in attach mode. 
`./bin/flink run -m yarn-cluster examples/streaming/WindowJoin.jar`
   * When the application is in ACCEPTED status, send the `Ctrl + C`.
   * The flink client exits immediately.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10435) Client sporadically hangs after Ctrl + C

2019-10-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10435:
---
Labels: pull-request-available  (was: )

> Client sporadically hangs after Ctrl + C
> 
>
> Key: FLINK-10435
> URL: https://issues.apache.org/jira/browse/FLINK-10435
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client, Deployment / YARN
>Affects Versions: 1.5.5, 1.6.2, 1.7.0, 1.9.1
>Reporter: Gary Yao
>Priority: Major
>  Labels: pull-request-available
>
> When submitting a YARN job cluster in attached mode, the client hangs 
> indefinitely if Ctrl + C is pressed at the right time. One can recover from 
> this by sending SIGKILL.
> *Command to submit job*
> {code}
> HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> examples/streaming/WordCount.jar
> {code}
>  
> *Output/Stacktrace*
> {code}
> [hadoop@ip-172-31-45-22 flink-1.5.4]$ HADOOP_CLASSPATH=`hadoop classpath` 
> bin/flink run -m yarn-cluster examples/streaming/WordCount.jar
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop/flink-1.5.4/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-26 12:01:04,241 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> ip-172-31-45-22.eu-central-1.compute.internal/172.31.45.22:8032
> 2018-09-26 12:01:04,386 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-26 12:01:04,386 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-26 12:01:04,402 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-09-26 12:01:04,598 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=1024, 
> taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-26 12:01:04,972 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop/flink-1.5.4/conf') contains both LOG4J 
> and Logback configuration files. Please delete or rename one of them.
> 2018-09-26 12:01:07,857 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting 
> application master application_1537944258063_0017
> 2018-09-26 12:01:07,913 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1537944258063_0017
> 2018-09-26 12:01:07,913 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for 
> the cluster to be allocated
> 2018-09-26 12:01:07,916 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
> cluster, current state ACCEPTED
> ^C2018-09-26 12:01:08,851 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cancelling 
> deployment from Deployment Failure Hook
> 2018-09-26 12:01:08,854 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Killing YARN 
> application
> 
>  The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:410)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:258)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> 

[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339397642
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
+
+   final List exclusiveSegments = 
globalPool.requestMemorySegments();
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(0, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   final AtomicBoolean completeFlag = new 
AtomicBoolean(false);
+   CompletableFuture availableFuture = 
globalPool.isAvailable();
+   availableFuture.whenComplete((ignored, throwable) -> 
completeFlag.set(true));
+
+   // recycle one buffer
+   globalPool.recycle(segments.get(0));
+   assertTrue(completeFlag.get());
+   assertTrue(availableFuture.isDone());
+   assertTrue(globalPool.isAvailable().isDone());
 
 Review comment:
   Even we could further remove below: 
   ```
   final AtomicBoolean completeFlag = new AtomicBoolean(false);
   availableFuture.whenComplete((ignored, throwable) -> completeFlag.set(true));
   assertTrue(completeFlag.get());
   ```
   Before recycling,  the global pool is not available, after recycling it 
becomes available via `assertTrue(globalPool.isAvailable().isDone())`. It is 
already enough to verify our logic changes.  It is useless to verify the logic 
action (`completeFlag.set(true))`) when future is completed, because in real 
code path the action is actually different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-27 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284001
 
 

 ##
 File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
 ##
 @@ -312,6 +381,60 @@ private static void generateRandomDirs(File dir, int 
numFiles, int numDirs, int
}
}
 
+   /**
+* Generate some files in the directory {@code dir}.
+* @param dir the directory where the files are generated
+* @return Tuple3 holding the generated files' absolute path, relative 
to the working directory path and relative
+* url.
+* @throws IOException if I/O error occurs while generating the files
+*/
+   public static Tuple3, Collection, 
Collection> prepareTestFiles(
+   final java.nio.file.Path dir) throws IOException {
+
+   Tuple3, Collection, Collection> 
result = new Tuple3<>();
+
+   result.f0 = generateSomeFilesInDirectoryReturnJarFiles(dir);
+   result.f1 = toRelativeFiles(result.f0);
+   result.f2 = toRelativeURLs(result.f1);
+
+   return result;
+   }
+
+   private static Collection 
generateSomeFilesInDirectoryReturnJarFiles(
+   final java.nio.file.Path dir) throws IOException {
+
+   final java.nio.file.Path jobSubDir1 = 
Files.createDirectory(dir.resolve("_sub_dir1"));
+   final java.nio.file.Path jobSubDir2 = 
Files.createDirectory(dir.resolve("_sub_dir2"));
+   final java.nio.file.Path jarFile1 = 
Files.createFile(dir.resolve("file1.jar"));
+   final java.nio.file.Path jarFile2 = 
Files.createFile(dir.resolve("file2.jar"));
+   final java.nio.file.Path jarFile3 = 
Files.createFile(jobSubDir1.resolve("file3.jar"));
+   final java.nio.file.Path jarFile4 = 
Files.createFile(jobSubDir2.resolve("file4.jar"));
+   final Collection jarFiles = new ArrayList<>();
+
+   Files.createFile(dir.resolve("file1.txt"));
+   Files.createFile(jobSubDir2.resolve("file2.txt"));
+
+   jarFiles.add(jarFile1.toFile());
+   jarFiles.add(jarFile2.toFile());
+   jarFiles.add(jarFile3.toFile());
+   jarFiles.add(jarFile4.toFile());
+   return jarFiles;
+   }
+
+   private static Collection toRelativeFiles(Collection files) 
{
+   final java.nio.file.Path workingDir = 
Paths.get(System.getProperty("user.dir"));
+   final Collection relativeFiles = new ArrayList<>();
+   files.forEach(file -> 
relativeFiles.add(workingDir.relativize(file.toPath()).toFile()));
+   return relativeFiles;
+   }
+
+   private static Collection toRelativeURLs(Collection 
relativeFiles) throws MalformedURLException {
+   final Collection relativeURLs = new ArrayList<>();
+   final URL context = new 
URL(relativeFiles.iterator().next().toURI().getScheme() + ":");
+   relativeFiles.forEach(FunctionUtils.uncheckedConsumer(file -> 
relativeURLs.add(new URL(context, file.toString();
 
 Review comment:
   maybe file.toString() -> file.getPath()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot 
sharing groups according to logical pipelined regions
URL: https://github.com/apache/flink/pull/10007#issuecomment-546673889
 
 
   
   ## CI report:
   
   * 2f64299fb74a59f77f14f20bed87191389f58317 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133706395)
   * 5b16730a6403a9cffdc564924be89121561f2e82 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133755954)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339397163
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
+   assertTrue(globalPool.isAvailable().isDone());
+   }
+
+   final List exclusiveSegments = 
globalPool.requestMemorySegments();
+   assertEquals(numberOfSegmentsToRequest, 
exclusiveSegments.size());
+
+   assertFalse(globalPool.isAvailable().isDone());
+   assertEquals(0, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   final AtomicBoolean completeFlag = new 
AtomicBoolean(false);
+   CompletableFuture availableFuture = 
globalPool.isAvailable();
+   availableFuture.whenComplete((ignored, throwable) -> 
completeFlag.set(true));
+
+   // recycle one buffer
+   globalPool.recycle(segments.get(0));
+   assertTrue(completeFlag.get());
+   assertTrue(availableFuture.isDone());
+   assertTrue(globalPool.isAvailable().isDone());
 
 Review comment:
   ```
   assertTrue(availableFuture.isDone());
   assertTrue(globalPool.isAvailable().isDone());
   ```
   They are the same thing and we could remove one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339396841
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
 
 Review comment:
   We could use the simple constructor `NetworkBufferPool(int 
numberOfSegmentsToAllocate, int segmentSize, int numberOfSegmentsToRequest)` 
for tests instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9993: [FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool.

2019-10-27 Thread GitBox
zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r339396836
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##
 @@ -518,4 +528,176 @@ public void go() throws Exception {
globalPool.destroy();
}
}
+
+   /**
+* Tests {@link NetworkBufferPool#isAvailable()}, verifying that the 
buffer availability is
+* correctly maintained and the future callback is correctly processed.
+*/
+   @Test
+   public void testBufferAvailabilityAndFutureCallback() throws Exception {
+   final int numBuffers = 10;
+   final int numberOfSegmentsToRequest = 5;
+   final Duration requestSegmentsTimeout = Duration.ofSeconds(10L);
+
+   final NetworkBufferPool globalPool = new NetworkBufferPool(
+   numBuffers,
+   128,
+   numberOfSegmentsToRequest,
+   requestSegmentsTimeout);
+
+   try {
+   assertTrue(globalPool.isAvailable().isDone());
+
+   List segments = new 
ArrayList<>(numberOfSegmentsToRequest);
+   for (int i = 0; i < numberOfSegmentsToRequest; ++i) {
+   MemorySegment segment = 
globalPool.requestMemorySegment();
+   assertNotNull(segment);
+   segments.add(segment);
 
 Review comment:
   `assertNotNull(segment)` could be done via 
`segments.add(checkNotNull(segment))`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add 
methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339396074
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/UnresolvedIdentifier.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Identifier of an object, such as table, view, function or type in a 
catalog. This identifier
+ * cannot be used directly to access an object in a {@link CatalogManager}, 
but has to be first
+ * fully resolved into {@link ObjectIdentifier}.
+ */
+@PublicEvolving
+public class UnresolvedIdentifier {
 
 Review comment:
   Some thoughts about this class:
   1. Should it be `@Internal`? I don't find any public interface or class 
referencing this yet. Otherwise, I would suggest to put this in 
`flink-table-common`.
   2. I kind of get confused by the prefix `Unresolved`. Normally we will do 
some validation check to make it a "resolved" identifier. But according to the 
implementation, we just expand the path. It dons't sound get resolved yet in my 
opinion. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339395859
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
 
 Review comment:
   I think having an offset doens't matter, because the planner will do the 
offset anyway (like we will do the limit anyway). Say we have a `limit 10 
offset 1`, I think we can push down `limit 11` and do the `limit 10 offset 1` 
after the source.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339389965
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   Could you add more description here about split limit and planner guarantee 
(they are mentioned in the rule javadoc)? I think these are useful to users.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339392752
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
+  + "table source with pushdown capability must override and change "
+  + "explainSource() API to explain the pushdown applied!")
+}
+
+call.transformTo(sort.copy(sort.getTraitSet, 
Collections.singletonList(newScan)))

[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339395422
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
 
 Review comment:
   Actually, I have some concerns on this. I also noticed we did the similar 
things in pushing filter and pushing projection rules. However, I think it 
pushes some dirty work to connectos, makes connectors more difficult to 

[GitHub] [flink] wuchong commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339393494
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * The table source which support push-down the limit to the source.
+  */
+class TestLimitableTableSource(
+data: Seq[Row],
+rowType: RowTypeInfo,
+var limit: Long = -1,
+var limitablePushedDown: Boolean = false)
+  extends StreamTableSource[Row]
+  with LimitableTableSource[Row] {
+
+  override def isBounded = true
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+val dataSet = if (limit > 0) {
+  data.take(limit.toInt).asJava
+} else {
+  data.asJava
+}
+execEnv.createInput(
+  new CollectionInputFormat(dataSet, rowType.createSerializer(new 
ExecutionConfig)),
+  rowType)
+  }
+
+  override def applyLimit(limit: Long): TableSource[Row] = {
+new TestLimitableTableSource(data, rowType, limit, limitablePushedDown)
+  }
+
+  override def isLimitPushedDown: Boolean = limitablePushedDown
+
+  override def getReturnType: TypeInformation[Row] = rowType
+
+  override def explainSource(): String = {
+if (limit > 0 && limit < Long.MaxValue) {
+  "limit: " + limit
+} else if (limitablePushedDown) {
+  "limitablePushedDown"
 
 Review comment:
   I think we will never reach here, right? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339395831
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -501,6 +537,11 @@ public void alterTable(CatalogBaseTable table, 
ObjectIdentifier objectIdentifier
 *  does not exist.
 */
public void dropTable(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+   if (temporaryTables.containsKey(objectIdentifier)) {
 
 Review comment:
   Just curious about this, I took a look to spark:
   ```
  * If a database is specified in `name`, this will drop the table from 
that database.
  * If no database is specified, this will first attempt to drop a 
temporary view with
  * the same name, then, if that does not exist, drop the table from the 
current database.
   ```
   Curious about other databases, I don't know how they behave.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339393920
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 ##
 @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
 * Searches for the specified table source, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSource(String name) {
Preconditions.checkNotNull(name);
TableSource tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-   tableEnv.registerTableSource(name, tableSource);
+   registration.createTableSource(name, tableSource);
}
 
/**
 * Searches for the specified table sink, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSink(String name) {
Preconditions.checkNotNull(name);
TableSink tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-   tableEnv.registerTableSink(name, tableSink);
+   registration.createTableSink(name, tableSink);
}
 
/**
 * Searches for the specified table source and sink, configures them 
accordingly, and registers
 * them as a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSourceAndSink(String name) {
registerTableSource(name);
registerTableSink(name);
}
 
+   /**
+* Registers the table described by underlying properties in a given 
path.
+*
+* There is no distinction between source and sink at the descriptor 
level anymore as this
+* method does not perform actual class lookup. It only stores the 
underlying properties. The
+* actual source/sink lookup is performed when the table is used.
+*
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
+* NOTE: The schema must be explicitly defined.
+*
+* @param path path where to register the temporary table
+*/
+   public void createTemporaryTable(String path) {
+   if (schemaDescriptor == null) {
+   throw new TableException(
+   "Table schema must be explicitly defined. To 
derive schema from the underlying connector" +
+   " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+   }
+
+   Map schemaProperties = 
schemaDescriptor.toProperties();
+   TableSchema tableSchema = getTableSchema(schemaProperties);
+
+   Map properties = new HashMap<>(toProperties());
+   schemaProperties.keySet().forEach(properties::remove);
+
+   CatalogTableImpl catalogTable = new CatalogTableImpl(
 
 Review comment:
   `CatalogTable` has `partitionKeys` too, consider add partition keys in 
`CatalogTableImpl.toProperties` and parse partition keys here?
   (We can add later too)


This is an automated message from the Apache Git Service.

[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339392415
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -73,7 +74,14 @@ abstract class TableEnvImpl(
 new TableReferenceLookup {
   override def lookupTable(name: String): 
Optional[TableReferenceExpression] = {
 JavaScalaConversionUtil
-  .toJava(scanInternal(Array(name)).map(t => new 
TableReferenceExpression(name, t)))
+  .toJava(
+Try({
+  val unresolvedIdentifier = UnresolvedIdentifier.of(name)
 
 Review comment:
   Why use `Try`? If it is an illegal name, should we throw exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339392857
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -363,15 +371,26 @@ abstract class TableEnvImpl(
 
   @throws[TableException]
   override def scan(tablePath: String*): Table = {
-scanInternal(tablePath.toArray) match {
+val unresolvedIdentifier = UnresolvedIdentifier.of(tablePath: _*)
+scanInternal(unresolvedIdentifier) match {
   case Some(table) => createTable(table)
   case None => throw new TableException(s"Table 
'${tablePath.mkString(".")}' was not found.")
 }
   }
 
-  private[flink] def scanInternal(tablePath: Array[String]): 
Option[CatalogQueryOperation] = {
-val unresolvedIdentifier = UnresolvedIdentifier.of(tablePath: _*)
-val objectIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier)
+  override def from(path: String): Table = {
+val parser = planningConfigurationBuilder.createCalciteParser()
+val unresolvedIdentifier = 
UnresolvedIdentifier.of(parser.parseIdentifier(path).names: _*)
+scanInternal(unresolvedIdentifier) match {
+  case Some(table) => createTable(table)
+  case None => throw new TableException(s"Table '$path' was not found.")
 
 Review comment:
   Can we have a unify exception message? Use `unresolvedIdentifier`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339393982
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 ##
 @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
 * Searches for the specified table source, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSource(String name) {
Preconditions.checkNotNull(name);
TableSource tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-   tableEnv.registerTableSource(name, tableSource);
+   registration.createTableSource(name, tableSource);
}
 
/**
 * Searches for the specified table sink, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSink(String name) {
Preconditions.checkNotNull(name);
TableSink tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-   tableEnv.registerTableSink(name, tableSink);
+   registration.createTableSink(name, tableSink);
}
 
/**
 * Searches for the specified table source and sink, configures them 
accordingly, and registers
 * them as a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSourceAndSink(String name) {
registerTableSource(name);
registerTableSink(name);
}
 
+   /**
+* Registers the table described by underlying properties in a given 
path.
+*
+* There is no distinction between source and sink at the descriptor 
level anymore as this
+* method does not perform actual class lookup. It only stores the 
underlying properties. The
+* actual source/sink lookup is performed when the table is used.
+*
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
+* NOTE: The schema must be explicitly defined.
+*
+* @param path path where to register the temporary table
+*/
+   public void createTemporaryTable(String path) {
+   if (schemaDescriptor == null) {
+   throw new TableException(
+   "Table schema must be explicitly defined. To 
derive schema from the underlying connector" +
+   " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+   }
+
+   Map schemaProperties = 
schemaDescriptor.toProperties();
+   TableSchema tableSchema = getTableSchema(schemaProperties);
+
+   Map properties = new HashMap<>(toProperties());
+   schemaProperties.keySet().forEach(properties::remove);
+
+   CatalogTableImpl catalogTable = new CatalogTableImpl(
 
 Review comment:
   Maybe we can use `CatalogTableBuilder` here?
   Looks like there some bugs in `CatalogTableBuilder`, it not remove 
`schemaProperties` in `properties`?


This is an automated message from the Apache Git Service.
To 

[GitHub] [flink] flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more exceptions by query parameter

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9859: [FLINK-11405][rest]rest api can more 
exceptions by query parameter
URL: https://github.com/apache/flink/pull/9859#issuecomment-539878774
 
 
   
   ## CI report:
   
   * c8bf66050c865904c402bdc9c079a6e4de1a064c : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131080663)
   * ec84f32bdcae0738d97ac60090691789334a279a : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133306001)
   * 6a12e5941baf73958146f54365ff3f267c696e11 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133310272)
   * 4318f07b5554bcf1abc44e912345927c65c8f8a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133323118)
   * 052e4580c08f9723e2716dedac7d26d031c4b538 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366819)
   * 27cd52fadf153982bc9771ba27a436caeebb17b8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133480918)
   * f44e43be2046b4d86a86891bd16189e91ddbe2ee : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] 
Add hash distribution and sort grouping only when dynamic partition insert
URL: https://github.com/apache/flink/pull/9796#issuecomment-536253430
 
 
   
   ## CI report:
   
   * 3b84408bb5ee764b1a64103b1f4df3e0ceacaba7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129578380)
   * 681339af85489fc3fb2274371067aa2de269e621 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129582634)
   * 61c5f74d60d7673ccb8e4d57812bf1d95c876f1f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131283509)
   * 73d33a1b8588776741c8ee1a5030b10ea7cdb580 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133751387)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot sharing groups according to logical pipelined regions

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10007: [FLINK-14060][runtime] Set slot 
sharing groups according to logical pipelined regions
URL: https://github.com/apache/flink/pull/10007#issuecomment-546673889
 
 
   
   ## CI report:
   
   * 2f64299fb74a59f77f14f20bed87191389f58317 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133706395)
   * 5b16730a6403a9cffdc564924be89121561f2e82 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
KurtYoung commented on a change in pull request #9971: [FLINK-14490][table] Add 
methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339394157
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 ##
 @@ -179,15 +154,7 @@ class FlinkPlannerImpl(
 schemaPath: util.List[String],
 viewPath: util.List[String]): RelRoot = {
 
-  val parser: SqlParser = SqlParser.create(queryString, parserConfig)
-  var sqlNode: SqlNode = null
-  try {
-sqlNode = parser.parseQuery
-  }
-  catch {
-case e: CSqlParseException =>
-  throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
-  }
+  val sqlNode: SqlNode = parser.parse(queryString)
 
 Review comment:
   Is it intended to re-use parser here? I noticed in `PlannerBase::parser` you 
pointed out that you didn't want to reuse parser since config might changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] 
Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#issuecomment-546558980
 
 
   
   ## CI report:
   
   * 74748e2c09f1b2b0b9bfb3c1850cd71696d30df8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133648556)
   * 407af42116be8c51f02403660e00e9f490bd7454 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133751372)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9915: [FLINK-14415][table-common] ValueLiteralExpression#equals should take array value into account

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9915: [FLINK-14415][table-common] 
ValueLiteralExpression#equals should take array value into account
URL: https://github.com/apache/flink/pull/9915#issuecomment-542752172
 
 
   
   ## CI report:
   
   * d6f66852e76bac070b37e7e9c7365423e5b66853 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/132183809)
   * 2e16ed14f272922207b947b6b16befbda74520d8 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9804: [FLINK-14239] Fix the max watermark in StreamSource may arrive the downstream operator early

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9804: [FLINK-14239] Fix the max watermark 
in StreamSource may arrive the downstream operator early
URL: https://github.com/apache/flink/pull/9804#issuecomment-536287116
 
 
   
   ## CI report:
   
   * cb5b4da4d8b5d877e296dd84095a328208263c15 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129588487)
   * 2906e837e6a7f70dda5bc112658b51157801150e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/129624664)
   * 9e84fcb1dc2b92d87010bbc8ba92fd9a508a27d5 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129632305)
   * 0c2e92b518e88c88b90c876cc8d20e688c51cecc : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/130935417)
   * 6780c04adab7c716882706600c1d3ff1af118b5b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/131060082)
   * a490f41f23e59f4e9a4c10e6c7c5ddf66e25737b : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] 
Add hash distribution and sort grouping only when dynamic partition insert
URL: https://github.com/apache/flink/pull/9796#issuecomment-536253430
 
 
   
   ## CI report:
   
   * 3b84408bb5ee764b1a64103b1f4df3e0ceacaba7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129578380)
   * 681339af85489fc3fb2274371067aa2de269e621 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129582634)
   * 61c5f74d60d7673ccb8e4d57812bf1d95c876f1f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131283509)
   * 73d33a1b8588776741c8ee1a5030b10ea7cdb580 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133751387)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
wuchong commented on a change in pull request #10001: 
[FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#discussion_r339387673
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -505,7 +503,7 @@ object AggregateUtil extends Enumeration {
   case VARCHAR | CHAR => 
fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE)
 
 Review comment:
   +1 
   I think this is legacy code. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #10001: 
[FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#discussion_r339387049
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -505,7 +503,7 @@ object AggregateUtil extends Enumeration {
   case VARCHAR | CHAR => 
fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE)
 
 Review comment:
   Can you change this to 
`DataTypes.VARCHAR(precision).bridgeTo(BinaryString.class)`, I think that is 
the right way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #10001: 
[FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#discussion_r339387049
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 ##
 @@ -505,7 +503,7 @@ object AggregateUtil extends Enumeration {
   case VARCHAR | CHAR => 
fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE)
 
 Review comment:
   Can you change this to 
`DataTypes.VARCHAR(precision).bridgeTo(BinaryString.class)` and 
`DataTypes.CHAR(precision).bridgeTo(BinaryString.class)`, I think that is the 
right way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tony810430 commented on issue #9824: [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when ena

2019-10-27 Thread GitBox
tony810430 commented on issue #9824: [FLINK-14302] [kafka] 
FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
`newPartitionsInTransaction` is empty when enable EoS
URL: https://github.com/apache/flink/pull/9824#issuecomment-546769755
 
 
   hi @becketqin, do you have free time to review this PR? I have done the test 
as you suggested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #9972: [FLINK-14496][client] Exclude detach flag from ClusterClient

2019-10-27 Thread GitBox
TisonKun commented on issue #9972: [FLINK-14496][client] Exclude detach flag 
from ClusterClient
URL: https://github.com/apache/flink/pull/9972#issuecomment-546768430
 
 
   Hi @aljoscha & @kl0u , I'm glad to hear your ideas :P


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #9915: [FLINK-14415][table-common] ValueLiteralExpression#equals should take array value into account

2019-10-27 Thread GitBox
wuchong commented on issue #9915: [FLINK-14415][table-common] 
ValueLiteralExpression#equals should take array value into account
URL: https://github.com/apache/flink/pull/9915#issuecomment-546768292
 
 
   Thanks @dawidwys , I will update and merge this then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-14535) Fix distinct key type for DecimalType in DistinctInfo

2019-10-27 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-14535:
---

Assignee: Zhenghua Gao

> Fix distinct key type for DecimalType in DistinctInfo
> -
>
> Key: FLINK-14535
> URL: https://issues.apache.org/jira/browse/FLINK-14535
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> DecimalType in DistinctInfo bridged to wrong external BigDecimal type, which 
> causes failures count distinct on decimal type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
wuchong edited a comment on issue #10001: [FLINK-14535][table-planner-blink] 
Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#issuecomment-546767439
 
 
   Hi @docete , thanks for the contribution. The change looks good to me. 
   
   I think we can improve the tests a bit more. Could you add an integration 
test which tests count distinct on all types? So that, we can merge 
`testTimeDistinct` , `testDateDistinct`, 
`testTimestampDistinct`,`testCountDistinct` into one. And we should add a same 
test in `SplitAggregateITCase`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
wuchong commented on issue #10001: [FLINK-14535][table-planner-blink] Fix 
distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#issuecomment-546767439
 
 
   Hi @docete , thanks for the contribution. The change looks good to me. 
   I think we can improve the tests a bit more. Could you add an integration 
test which tests count distinct on all types? So that, we can merge 
`testTimeDistinct` , `testDateDistinct`, 
`testTimestampDistinct`,`testCountDistinct` into one. And we should add a same 
test in `SplitAggregateITCase`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-27 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339385051
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   /* A collection of relative jar paths to the working directory */
+   private final List userClassPaths;
+
+   protected AbstractUserClassPathJobGraphRetriever(@Nullable final File 
jobDir) throws IOException {
+   if (jobDir == null) {
+   userClassPaths = Collections.emptyList();
+   } else {
+   final Collection jarFiles = 
FileUtils.listFilesInPath(jobDir, file -> file.getName().endsWith(".jar"));
+   final Collection relativeFiles = 
FileUtils.relativizeToWorkingDir(jarFiles);
+   this.userClassPaths = new 
ArrayList<>(FileUtils.toRelativeURLs(relativeFiles));
 
 Review comment:
   Or do we need a specified rule to order the classpaths if we'd like it to be 
List?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #9927: [FLINK-14397][hive] Failed to run Hive UDTF with array arguments

2019-10-27 Thread GitBox
lirui-apache commented on issue #9927: [FLINK-14397][hive] Failed to run Hive 
UDTF with array arguments
URL: https://github.com/apache/flink/pull/9927#issuecomment-546766553
 
 
   @bowenli86 Has this one been merged?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] Add hash distribution and sort grouping only when dynamic partition insert

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #9796: [FLINK-14253][table-planner-blink] 
Add hash distribution and sort grouping only when dynamic partition insert
URL: https://github.com/apache/flink/pull/9796#issuecomment-536253430
 
 
   
   ## CI report:
   
   * 3b84408bb5ee764b1a64103b1f4df3e0ceacaba7 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129578380)
   * 681339af85489fc3fb2274371067aa2de269e621 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/129582634)
   * 61c5f74d60d7673ccb8e4d57812bf1d95c876f1f : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/131283509)
   * 73d33a1b8588776741c8ee1a5030b10ea7cdb580 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] Fix distinct key type for DecimalT…

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10001: [FLINK-14535][table-planner-blink] 
Fix distinct key type for DecimalT…
URL: https://github.com/apache/flink/pull/10001#issuecomment-546558980
 
 
   
   ## CI report:
   
   * 74748e2c09f1b2b0b9bfb3c1850cd71696d30df8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133648556)
   * 407af42116be8c51f02403660e00e9f490bd7454 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13839) Support to set yarn node label for flink jobmanager and taskmanager container

2019-10-27 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang closed FLINK-13839.
-
Resolution: Duplicate

> Support to set yarn node label for flink jobmanager and taskmanager container
> -
>
> Key: FLINK-13839
> URL: https://issues.apache.org/jira/browse/FLINK-13839
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / YARN
>Reporter: Yang Wang
>Priority: Major
>
> Yarn node label feature is introduced from 2.6. It is a way to group nodes 
> with similar characteristics and applications can specify where to run. In 
> the production or cloud environment, we want to the jobmanager running on 
> some more stable machines. The node label could help us to achieve that.
>  
> However, the ResourceRequest.setNodeLabelExpression have not been supported 
> in the current hadoop version dependency(2.4.1). So we need to bump the 
> hadoop version to 2.6.5.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-12675) Event time synchronization in Kafka consumer

2019-10-27 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960301#comment-16960301
 ] 

Jiayi Liao edited comment on FLINK-12675 at 10/28/19 12:03 AM:
---

[~thw] I've drafted a design for this. It follows your design on Kinesis Source 
but there're still a lot of changes due to the difference between these two 
connectors processing model. Could you take a look if you can spare time?

https://docs.google.com/document/d/1d4XhVK4BD9GGNqhfSk_U30nEYOXx8THCHhR77RNbIbA/edit?usp=sharing


was (Author: wind_ljy):
[~thw] I've drafted a design for this. It follows your design on Kinesis Source 
but there're still a lot of changes due to the difference between these two 
connectors processing model. Could you take a look if you can spare time?

https://docs.google.com/document/d/1d4XhVK4BD9GGNqhfSk_U30nEYOXx8THCHhR77RNbIbA/edit#

> Event time synchronization in Kafka consumer
> 
>
> Key: FLINK-12675
> URL: https://issues.apache.org/jira/browse/FLINK-12675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> Integrate the source watermark tracking into the Kafka consumer and implement 
> the sync mechanism (different consumer model, compared to Kinesis).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task 
starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133733201)
   * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133738302)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task 
starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133733201)
   * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133738302)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task 
starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133733201)
   * fbfcab3e7a1129007a0ccf491e0b91a6c3269bcb : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task 
starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133733201)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless 
NotifyCheckpointComplete and TriggerCheckpoint
URL: https://github.com/apache/flink/pull/10008#issuecomment-546719490
 
 
   
   ## CI report:
   
   * 4e9fe99659cd1826b676b87a1029453fc46edccb : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133729777)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10009: [FLINK-14304] WIP -- Avoid task 
starvation with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133732010)
   * 36d23db6c3e89c7044d4eaf916661745c69c349b : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation 
with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546724255
 
 
   
   ## CI report:
   
   * 2b39236d76b1c56d240f2476c535568f0614c577 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
flinkbot commented on issue #10009: [FLINK-14304] WIP -- Avoid task starvation 
with mailbox
URL: https://github.com/apache/flink/pull/10009#issuecomment-546723463
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 2b39236d76b1c56d240f2476c535568f0614c577 (Sun Oct 27 
19:02:14 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-14304).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise opened a new pull request #10009: [FLINK-14304] WIP -- Avoid task starvation with mailbox

2019-10-27 Thread GitBox
AHeise opened a new pull request #10009: [FLINK-14304] WIP -- Avoid task 
starvation with mailbox
URL: https://github.com/apache/flink/pull/10009
 
 
   
   
   ## What is the purpose of the change
   
   Currently, all mails are always prioritized over regular input, which makes 
sense in most cases. However, it's easy to devise an operator that gets into 
starvation: each mail enqueues a new mail.
   
   In concept, this ticket implements a simple extension in the mailbox 
processor: instead of draining the mailbox one-by-one, fetch all mails from the 
mailbox and run them one-by-one before running the default action. Only then, 
fetch all mails again and repeat. So we execute all mails that are available at 
the start of this loop but no mails that are added in the meantime.
   
   ## Brief change log
   
   - Add batch capability to `TaskMailbox`. A batch takes a snapshot of the 
current mails and makes them available for subsequent processing.
   - `MailProcessor` only processes a batch per invocation of `#processMail()`
   - Major cleanup of `Mailbox` to clearly show the underlying threading model.
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   Mostly covered by existing tests.
   - One new test that explicitly tests task starvation.
   - New tests for the new interfaces.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no, but it refines 
Mailbox model)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14304) Avoid task starvation with mailbox

2019-10-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14304:
---
Labels: pull-request-available  (was: )

> Avoid task starvation with mailbox
> --
>
> Key: FLINK-14304
> URL: https://issues.apache.org/jira/browse/FLINK-14304
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Arvid Heise
>Priority: Major
>  Labels: pull-request-available
>
> Currently, all mails are always prioritized over regular input, which makes 
> sense in most cases. However, it's easy to devise an operator that gets into 
> starvation: each mail enqueues a new mail.
> This ticket implements a simple extension in the mailbox processor: instead 
> of draining the mailbox one-by-one, fetch all mails from the mailbox and run 
> them one-by-one before running the default action. Only then, fetch all mails 
> again and repeat.
> So we execute all mails that are available at the start of this loop but no 
> mails that are added in the meantime.
> Special attention needs to be directed towards yield to downstream, such that 
> it doesn't process mails outside of the current batch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread GitBox
flinkbot edited a comment on issue #10008: [FLINK-14403] Remove uesless 
NotifyCheckpointComplete and TriggerCheckpoint
URL: https://github.com/apache/flink/pull/10008#issuecomment-546719490
 
 
   
   ## CI report:
   
   * 4e9fe99659cd1826b676b87a1029453fc46edccb : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/133729777)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread GitBox
flinkbot commented on issue #10008: [FLINK-14403] Remove uesless 
NotifyCheckpointComplete and TriggerCheckpoint
URL: https://github.com/apache/flink/pull/10008#issuecomment-546719490
 
 
   
   ## CI report:
   
   * 4e9fe99659cd1826b676b87a1029453fc46edccb : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread GitBox
flinkbot commented on issue #10008: [FLINK-14403] Remove uesless 
NotifyCheckpointComplete and TriggerCheckpoint
URL: https://github.com/apache/flink/pull/10008#issuecomment-546719029
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 4e9fe99659cd1826b676b87a1029453fc46edccb (Sun Oct 27 
18:06:41 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread GitBox
Myasuka commented on issue #10008: [FLINK-14403] Remove uesless 
NotifyCheckpointComplete and TriggerCheckpoint
URL: https://github.com/apache/flink/pull/10008#issuecomment-546718936
 
 
   CC @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14403) Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14403:
---
Labels: pull-request-available  (was: )

> Remove uesless NotifyCheckpointComplete and TriggerCheckpoint
> -
>
> Key: FLINK-14403
> URL: https://issues.apache.org/jira/browse/FLINK-14403
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>
> After [FLINK-12322|https://issues.apache.org/jira/browse/FLINK-12322] fixed, 
> we have removed  legacy {{ActorTaskManagerGateway}} and the usage of 
> {{NotifyCheckpointComplete}} and {{TriggerCheckpoint}} have been disabled. 
> However, these classes still exist currently, we should also remove them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] Myasuka opened a new pull request #10008: [FLINK-14403] Remove uesless NotifyCheckpointComplete and TriggerCheckpoint

2019-10-27 Thread GitBox
Myasuka opened a new pull request #10008: [FLINK-14403] Remove uesless 
NotifyCheckpointComplete and TriggerCheckpoint
URL: https://github.com/apache/flink/pull/10008
 
 
   ## What is the purpose of the change
   
   After FLINK-12322 fixed, we have removed legacy ActorTaskManagerGateway and 
the usage of NotifyCheckpointComplete and TriggerCheckpoint have been disabled. 
However, these classes still exist currently, we should also remove them.
   
   ## Brief change log
   
 - Remove code of `NotifyCheckpointComplete`, `TriggerCheckpoint` and its 
usage in test `CheckpointMessagesTest`.
   
   ## Verifying this change
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers:  no 
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14482) Bump up rocksdb version to support WriteBufferManager

2019-10-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16960649#comment-16960649
 ] 

Yun Tang commented on FLINK-14482:
--

[~trohrmann] Yes, we first plan to upgrade the base RocksDB version of FRocksDB 
but found there existed some performance regression. Until now, I think we 
might still have to remain previous base RocksDB version and cherry-pick 
write-buffer manager feature code in Flink-1.10

>From my point of view, I think this issue is target for Flink-1.11 for the 
>future and still need to maintain it as {{open}}.

> Bump up rocksdb version to support WriteBufferManager
> -
>
> Key: FLINK-14482
> URL: https://issues.apache.org/jira/browse/FLINK-14482
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.11.0
>
>
> Current rocksDB-5.17.2 does not support write buffer manager well, we need to 
> bump rocksdb version to support that feature.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-10-27 Thread GitBox
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add 
the class for multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#discussion_r339354617
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.statistics.basicstatistic;
+
+import org.apache.flink.ml.common.linalg.BLAS;
+import org.apache.flink.ml.common.linalg.DenseMatrix;
+import org.apache.flink.ml.common.linalg.DenseVector;
+import org.apache.flink.ml.common.linalg.SparseVector;
+import org.apache.flink.ml.common.linalg.Vector;
+
+import com.github.fommil.netlib.LAPACK;
+import org.netlib.util.intW;
+
+/**
+ * This class provides basic functionality for a Multivariate Gaussian 
(Normal) Distribution.
+ */
+public class MultivariateGaussian {
+
+   private static final LAPACK LAPACK_INST = LAPACK.getInstance();
+   private static final double EPSILON;
+
+   static {
+   double eps = 1.0;
+   while ((1.0 + (eps / 2.0)) != 1.0) {
+   eps /= 2.0;
+   }
+   EPSILON = eps;
+   }
+
+   private final DenseVector mean;
+   private final DenseMatrix cov;
+
+   private DenseMatrix rootSigmaInv;
+   private double u;
+
+   // data buffers for computing pdf
+   private DenseVector delta;
+   private DenseVector v;
+
+   /**
+* The constructor.
+*
+* @param mean The mean vector of the distribution.
+* @param cov  The covariance matrix of the distribution.
+*/
+   public MultivariateGaussian(DenseVector mean, DenseMatrix cov) {
+   this.mean = mean;
+   this.cov = cov;
+   this.delta = DenseVector.zeros(mean.size());
+   this.v = DenseVector.zeros(mean.size());
+   calculateCovarianceConstants();
+   }
+
+   /**
+* Returns density of this multivariate Gaussian at given point, x.
+*/
+   public double pdf(Vector x) {
+   return Math.exp(logpdf(x));
+   }
+
+   /**
+* Returns the log-density of this multivariate Gaussian at given 
point, x.
+*/
+   public double logpdf(Vector x) {
+   int n = mean.size();
+   System.arraycopy(mean.getData(), 0, delta.getData(), 0, n);
+   BLAS.scal(-1.0, delta);
+   if (x instanceof DenseVector) {
+   BLAS.axpy(1., (DenseVector) x, delta);
+   } else if (x instanceof SparseVector) {
+   BLAS.axpy(1., (SparseVector) x, delta);
+   }
+   BLAS.gemv(1.0, rootSigmaInv, false, delta, 0., v);
+   return u - 0.5 * BLAS.dot(v, v);
+   }
+
+   /**
+* Compute distribution dependent constants.
+*
+* The probability density function is calculated as:
+* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t 
* inv(sigma) * (x-mu))
+*
+* Here we compute the following distribution dependent constants 
that can be reused in each pdf computation:
+* A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^)
+* B) rootSigmaInv = sqrt(inv(sigma)) = D^(-1/2)^ * U.t
+*
+* 
+*  sigma = U * D * U.t
+*  inv(sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * 
(D^{-1/2}^ * U.t)
+*  sqrt(inv(sigma)) = D^(-1/2)^ * U.t
+* 
+*/
+   private void calculateCovarianceConstants() {
+   int n = this.mean.size();
+   int lwork = 3 * n - 1;
+   double[] matA = new double[n * n];
+   double[] work = new double[lwork];
+   double[] evs = new double[n];
+   intW info = new intW(0);
+
+   for (int i = 0; i < n; i++) {
+   System.arraycopy(cov.getData(), i * n, matA, i * n, i + 
1);
+   }
+   LAPACK_INST.dsyev("V", "U", n, matA, n, evs, work, lwork, info);
+
+   

[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-10-27 Thread GitBox
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add 
the class for multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#discussion_r339354266
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.statistics.basicstatistic;
+
+import org.apache.flink.ml.common.linalg.BLAS;
+import org.apache.flink.ml.common.linalg.DenseMatrix;
+import org.apache.flink.ml.common.linalg.DenseVector;
+import org.apache.flink.ml.common.linalg.SparseVector;
+import org.apache.flink.ml.common.linalg.Vector;
+
+import com.github.fommil.netlib.LAPACK;
+import org.netlib.util.intW;
+
+/**
+ * This class provides basic functionality for a Multivariate Gaussian 
(Normal) Distribution.
+ */
+public class MultivariateGaussian {
+
+   private static final LAPACK LAPACK_INST = LAPACK.getInstance();
+   private static final double EPSILON;
+
+   static {
+   double eps = 1.0;
+   while ((1.0 + (eps / 2.0)) != 1.0) {
+   eps /= 2.0;
+   }
+   EPSILON = eps;
+   }
+
+   private final DenseVector mean;
+   private final DenseMatrix cov;
+
+   private DenseMatrix rootSigmaInv;
+   private double u;
 
 Review comment:
   members can be `final`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-10-27 Thread GitBox
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add 
the class for multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#discussion_r339354298
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussianTest.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.statistics.basicstatistic;
+
+import org.apache.flink.ml.common.linalg.DenseMatrix;
+import org.apache.flink.ml.common.linalg.DenseVector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for MultivariateGaussian.
+ */
+public class MultivariateGaussianTest {
+   private static final double TOL = 1.0e-5;
+
 
 Review comment:
   add test for `Univariate` as well just in case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-10-27 Thread GitBox
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add 
the class for multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#discussion_r339354650
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.statistics.basicstatistic;
+
+import org.apache.flink.ml.common.linalg.BLAS;
+import org.apache.flink.ml.common.linalg.DenseMatrix;
+import org.apache.flink.ml.common.linalg.DenseVector;
+import org.apache.flink.ml.common.linalg.SparseVector;
+import org.apache.flink.ml.common.linalg.Vector;
+
+import com.github.fommil.netlib.LAPACK;
+import org.netlib.util.intW;
+
+/**
+ * This class provides basic functionality for a Multivariate Gaussian 
(Normal) Distribution.
+ */
+public class MultivariateGaussian {
+
+   private static final LAPACK LAPACK_INST = LAPACK.getInstance();
+   private static final double EPSILON;
+
+   static {
+   double eps = 1.0;
+   while ((1.0 + (eps / 2.0)) != 1.0) {
+   eps /= 2.0;
+   }
+   EPSILON = eps;
+   }
+
+   private final DenseVector mean;
+   private final DenseMatrix cov;
+
+   private DenseMatrix rootSigmaInv;
+   private double u;
+
+   // data buffers for computing pdf
+   private DenseVector delta;
+   private DenseVector v;
+
+   /**
+* The constructor.
+*
+* @param mean The mean vector of the distribution.
+* @param cov  The covariance matrix of the distribution.
+*/
+   public MultivariateGaussian(DenseVector mean, DenseMatrix cov) {
+   this.mean = mean;
+   this.cov = cov;
+   this.delta = DenseVector.zeros(mean.size());
+   this.v = DenseVector.zeros(mean.size());
+   calculateCovarianceConstants();
+   }
+
+   /**
+* Returns density of this multivariate Gaussian at given point, x.
+*/
+   public double pdf(Vector x) {
+   return Math.exp(logpdf(x));
+   }
+
+   /**
+* Returns the log-density of this multivariate Gaussian at given 
point, x.
+*/
+   public double logpdf(Vector x) {
+   int n = mean.size();
+   System.arraycopy(mean.getData(), 0, delta.getData(), 0, n);
+   BLAS.scal(-1.0, delta);
+   if (x instanceof DenseVector) {
+   BLAS.axpy(1., (DenseVector) x, delta);
+   } else if (x instanceof SparseVector) {
+   BLAS.axpy(1., (SparseVector) x, delta);
+   }
+   BLAS.gemv(1.0, rootSigmaInv, false, delta, 0., v);
+   return u - 0.5 * BLAS.dot(v, v);
+   }
+
+   /**
+* Compute distribution dependent constants.
+*
+* The probability density function is calculated as:
+* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t 
* inv(sigma) * (x-mu))
+*
+* Here we compute the following distribution dependent constants 
that can be reused in each pdf computation:
+* A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^)
+* B) rootSigmaInv = sqrt(inv(sigma)) = D^(-1/2)^ * U.t
+*
+* 
+*  sigma = U * D * U.t
+*  inv(sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * 
(D^{-1/2}^ * U.t)
+*  sqrt(inv(sigma)) = D^(-1/2)^ * U.t
+* 
+*/
+   private void calculateCovarianceConstants() {
+   int n = this.mean.size();
+   int lwork = 3 * n - 1;
+   double[] matA = new double[n * n];
+   double[] work = new double[lwork];
+   double[] evs = new double[n];
+   intW info = new intW(0);
+
+   for (int i = 0; i < n; i++) {
+   System.arraycopy(cov.getData(), i * n, matA, i * n, i + 
1);
+   }
+   LAPACK_INST.dsyev("V", "U", n, matA, n, evs, work, lwork, info);
+
+   

[GitHub] [flink] walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add the class for multivariate Gaussian Distribution.

2019-10-27 Thread GitBox
walterddr commented on a change in pull request #9733: [FLINK-14154][ml] Add 
the class for multivariate Gaussian Distribution.
URL: https://github.com/apache/flink/pull/9733#discussion_r339354278
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common.statistics.basicstatistic;
+
+import org.apache.flink.ml.common.linalg.BLAS;
+import org.apache.flink.ml.common.linalg.DenseMatrix;
+import org.apache.flink.ml.common.linalg.DenseVector;
+import org.apache.flink.ml.common.linalg.SparseVector;
+import org.apache.flink.ml.common.linalg.Vector;
+
+import com.github.fommil.netlib.LAPACK;
+import org.netlib.util.intW;
+
+/**
+ * This class provides basic functionality for a Multivariate Gaussian 
(Normal) Distribution.
+ */
+public class MultivariateGaussian {
+
+   private static final LAPACK LAPACK_INST = LAPACK.getInstance();
+   private static final double EPSILON;
+
+   static {
+   double eps = 1.0;
+   while ((1.0 + (eps / 2.0)) != 1.0) {
+   eps /= 2.0;
+   }
+   EPSILON = eps;
+   }
+
+   private final DenseVector mean;
+   private final DenseMatrix cov;
+
+   private DenseMatrix rootSigmaInv;
+   private double u;
+
+   // data buffers for computing pdf
+   private DenseVector delta;
+   private DenseVector v;
+
+   /**
+* The constructor.
+*
+* @param mean The mean vector of the distribution.
+* @param cov  The covariance matrix of the distribution.
+*/
+   public MultivariateGaussian(DenseVector mean, DenseMatrix cov) {
+   this.mean = mean;
+   this.cov = cov;
+   this.delta = DenseVector.zeros(mean.size());
+   this.v = DenseVector.zeros(mean.size());
+   calculateCovarianceConstants();
+   }
+
+   /**
+* Returns density of this multivariate Gaussian at given point, x.
+*/
+   public double pdf(Vector x) {
+   return Math.exp(logpdf(x));
+   }
+
+   /**
+* Returns the log-density of this multivariate Gaussian at given 
point, x.
+*/
+   public double logpdf(Vector x) {
+   int n = mean.size();
+   System.arraycopy(mean.getData(), 0, delta.getData(), 0, n);
+   BLAS.scal(-1.0, delta);
+   if (x instanceof DenseVector) {
+   BLAS.axpy(1., (DenseVector) x, delta);
+   } else if (x instanceof SparseVector) {
+   BLAS.axpy(1., (SparseVector) x, delta);
+   }
+   BLAS.gemv(1.0, rootSigmaInv, false, delta, 0., v);
+   return u - 0.5 * BLAS.dot(v, v);
+   }
+
+   /**
+* Compute distribution dependent constants.
+*
+* The probability density function is calculated as:
+* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t 
* inv(sigma) * (x-mu))
+*
+* Here we compute the following distribution dependent constants 
that can be reused in each pdf computation:
+* A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^)
+* B) rootSigmaInv = sqrt(inv(sigma)) = D^(-1/2)^ * U.t
+*
+* 
+*  sigma = U * D * U.t
+*  inv(sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * 
(D^{-1/2}^ * U.t)
+*  sqrt(inv(sigma)) = D^(-1/2)^ * U.t
+* 
+*/
+   private void calculateCovarianceConstants() {
+   int n = this.mean.size();
+   int lwork = 3 * n - 1;
+   double[] matA = new double[n * n];
+   double[] work = new double[lwork];
+   double[] evs = new double[n];
+   intW info = new intW(0);
+
+   for (int i = 0; i < n; i++) {
+   System.arraycopy(cov.getData(), i * n, matA, i * n, i + 
1);
+   }
 
 Review comment:
   `System.arraycopy(cov.getData(), 0, matA, 0, n*n);`?


  1   2   >