[jira] [Commented] (FLINK-7788) Allow port range for queryable state client proxy.
[ https://issues.apache.org/jira/browse/FLINK-7788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208891#comment-16208891 ] Kostas Kloudas commented on FLINK-7788: --- Merged at 5338f856b42422189246130cc245754162fa9913 > Allow port range for queryable state client proxy. > -- > > Key: FLINK-7788 > URL: https://issues.apache.org/jira/browse/FLINK-7788 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > > Currently the newly introduced queryable state client proxy can only take one > port as a parameter to bind to. In case of multiple proxies running on one > machine, this can result in port clashes and inability to start the > corresponding proxies. > This issue proposes to allow the specification of a port range, so that if > some ports in the range are occupied, the proxy can still pick from the > remaining free ones. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7788) Allow port range for queryable state client proxy.
[ https://issues.apache.org/jira/browse/FLINK-7788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-7788. - Resolution: Fixed > Allow port range for queryable state client proxy. > -- > > Key: FLINK-7788 > URL: https://issues.apache.org/jira/browse/FLINK-7788 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > > Currently the newly introduced queryable state client proxy can only take one > port as a parameter to bind to. In case of multiple proxies running on one > machine, this can result in port clashes and inability to start the > corresponding proxies. > This issue proposes to allow the specification of a port range, so that if > some ports in the range are occupied, the proxy can still pick from the > remaining free ones. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5920) Make port (range) of queryable state server configurable
[ https://issues.apache.org/jira/browse/FLINK-5920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208890#comment-16208890 ] Kostas Kloudas commented on FLINK-5920: --- Merged at 717a7dc81d066dc7d6e1a17099c0f5e1bc96b5d1 > Make port (range) of queryable state server configurable > > > Key: FLINK-5920 > URL: https://issues.apache.org/jira/browse/FLINK-5920 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Yelei Feng >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > we should support to set port range for config {{query.server.port}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5920) Make port (range) of queryable state server configurable
[ https://issues.apache.org/jira/browse/FLINK-5920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-5920. - Resolution: Fixed > Make port (range) of queryable state server configurable > > > Key: FLINK-5920 > URL: https://issues.apache.org/jira/browse/FLINK-5920 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Yelei Feng >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > we should support to set port range for config {{query.server.port}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4814: [FLINK-5920][FLINK-7788][QS] Allow to specify a ra...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4814 ---
[jira] [Commented] (FLINK-5920) Make port (range) of queryable state server configurable
[ https://issues.apache.org/jira/browse/FLINK-5920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1620#comment-1620 ] ASF GitHub Bot commented on FLINK-5920: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4814 > Make port (range) of queryable state server configurable > > > Key: FLINK-5920 > URL: https://issues.apache.org/jira/browse/FLINK-5920 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Yelei Feng >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > we should support to set port range for config {{query.server.port}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4838: [FLINK-7826][QS] Add support for all types of stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4838#discussion_r145326904 --- Diff: flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableAggregatingStateTest}. --- End diff -- typo, should be `ImmutableAggregatingState` ---
[jira] [Commented] (FLINK-7826) Add support for all types of state to the QueryableStateClient.
[ https://issues.apache.org/jira/browse/FLINK-7826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208876#comment-16208876 ] ASF GitHub Bot commented on FLINK-7826: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4838#discussion_r145326904 --- Diff: flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableAggregatingStateTest}. --- End diff -- typo, should be `ImmutableAggregatingState` > Add support for all types of state to the QueryableStateClient. > --- > > Key: FLINK-7826 > URL: https://issues.apache.org/jira/browse/FLINK-7826 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5920) Make port (range) of queryable state server configurable
[ https://issues.apache.org/jira/browse/FLINK-5920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208871#comment-16208871 ] ASF GitHub Bot commented on FLINK-5920: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4814 The changes look good! Could you please merge this as soon as possible, we're in a bit of a hurry with the release. 😅 > Make port (range) of queryable state server configurable > > > Key: FLINK-5920 > URL: https://issues.apache.org/jira/browse/FLINK-5920 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Yelei Feng >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > we should support to set port range for config {{query.server.port}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4814: [FLINK-5920][FLINK-7788][QS] Allow to specify a range of ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4814 The changes look good! Could you please merge this as soon as possible, we're in a bit of a hurry with the release. 😅 ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208868#comment-16208868 ] ASF GitHub Bot commented on FLINK-7076: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r145325673 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public void stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker + public void stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); --- End diff -- Since we will need to keep workerNodeMap anyway, we can actually just look up the containerId using resourceId from workerNodeMap, there is no need to calculate the containerId from resourceId, so I think we dont need to use ContainerId.fromString. Also, copying the code will have compatibility issue if the fromString method differ in different hadoop version. What do you think? > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Shuyi Chen > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r145325673 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile resourceProfile) { } @Override - public void stopWorker(ResourceID resourceID) { - // TODO: Implement to stop the worker + public void stopWorker(YarnWorkerNode workerNode) { + workerNodeMap.remove(workerNode.getResourceID().toString()); --- End diff -- Since we will need to keep workerNodeMap anyway, we can actually just look up the containerId using resourceId from workerNodeMap, there is no need to calculate the containerId from resourceId, so I think we dont need to use ContainerId.fromString. Also, copying the code will have compatibility issue if the fromString method differ in different hadoop version. What do you think? ---
[jira] [Created] (FLINK-7864) Support side-outputs in CoProcessFunction
Aljoscha Krettek created FLINK-7864: --- Summary: Support side-outputs in CoProcessFunction Key: FLINK-7864 URL: https://issues.apache.org/jira/browse/FLINK-7864 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Aljoscha Krettek Fix For: 1.4.0 We forgot to add support for side-outputs when we added that to {{ProcessFunction}}. Should be as easy as adding it to the {{Context}} and wiring it in. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7703) Port JobExceptionsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208814#comment-16208814 ] ASF GitHub Bot commented on FLINK-7703: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4843 @zentol Thank you for your review. I think you're right about ExecutionExceptionsInfo and JobExceptionsInfo, and I have fixed them already. THX > Port JobExceptionsHandler to new REST endpoint > -- > > Key: FLINK-7703 > URL: https://issues.apache.org/jira/browse/FLINK-7703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > Labels: flip-6 > Fix For: 1.5.0 > > > Port existing {{JobExceptionsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4843: [FLINK-7703] Port JobExceptionsHandler to new REST endpoi...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4843 @zentol Thank you for your review. I think you're right about ExecutionExceptionsInfo and JobExceptionsInfo, and I have fixed them already. THX ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208693#comment-16208693 ] ASF GitHub Bot commented on FLINK-7853: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145299001 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- This is the same with the batch fashion. > This test should have been used as a normal test case for this join, i.e., we don't expect it to raise an exception. I'd suggest to keep it for later use, otherwise we may not be aware the test is incomplete in the future. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145299001 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- This is the same with the batch fashion. > This test should have been used as a normal test case for this join, i.e., we don't expect it to raise an exception. I'd suggest to keep it for later use, otherwise we may not be aware the test is incomplete in the future. ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208686#comment-16208686 ] ASF GitHub Bot commented on FLINK-7853: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145298482 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -82,6 +82,42 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- This test should have been used as a normal test case for this join, i.e., we don't except it to raise an exception. I'd suggest to keep it for later use, otherwise we may not be aware the test is incomplete in the future. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145298482 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -82,6 +82,42 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- This test should have been used as a normal test case for this join, i.e., we don't except it to raise an exception. I'd suggest to keep it for later use, otherwise we may not be aware the test is incomplete in the future. ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208674#comment-16208674 ] ASF GitHub Bot commented on FLINK-7608: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145295629 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogram.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +/** + * The {@link LatencyHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. + */ +public class LatencyHistogram implements org.apache.flink.metrics.Histogram { --- End diff -- I do not know how to change, could you give some suggestions? > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4826: [FLINK-7608][metric] Refactoring latency statistic...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145295629 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogram.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +/** + * The {@link LatencyHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. + */ +public class LatencyHistogram implements org.apache.flink.metrics.Histogram { --- End diff -- I do not know how to change, could you give some suggestions? ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208661#comment-16208661 ] ASF GitHub Bot commented on FLINK-7608: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145294253 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogramStatistics.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + + +/** + * Latency histogram statistics implementation returned by {@link LatencyHistogram}. + * The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly. + */ +public class LatencyHistogramStatistics extends HistogramStatistics { + private final DescriptiveStatistics latencyHistogram; + + public LatencyHistogramStatistics(DescriptiveStatistics latencyHistogram) { + this.latencyHistogram = latencyHistogram; + } + + @Override + public double getQuantile(double quantile) { + return latencyHistogram.getPercentile(quantile); + } + + @Override + public long[] getValues() { + // Due to latencyHistogram.getValues() return double[] + throw new UnsupportedOperationException(); --- End diff -- yes, will change. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4826: [FLINK-7608][metric] Refactoring latency statistic...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145294253 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogramStatistics.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + + +/** + * Latency histogram statistics implementation returned by {@link LatencyHistogram}. + * The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly. + */ +public class LatencyHistogramStatistics extends HistogramStatistics { + private final DescriptiveStatistics latencyHistogram; + + public LatencyHistogramStatistics(DescriptiveStatistics latencyHistogram) { + this.latencyHistogram = latencyHistogram; + } + + @Override + public double getQuantile(double quantile) { + return latencyHistogram.getPercentile(quantile); + } + + @Override + public long[] getValues() { + // Due to latencyHistogram.getValues() return double[] + throw new UnsupportedOperationException(); --- End diff -- yes, will change. ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208657#comment-16208657 ] ASF GitHub Bot commented on FLINK-7608: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145293354 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogramStatistics.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + + +/** + * Latency histogram statistics implementation returned by {@link LatencyHistogram}. + * The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly. + */ +public class LatencyHistogramStatistics extends HistogramStatistics { + private final DescriptiveStatistics latencyHistogram; + + public LatencyHistogramStatistics(DescriptiveStatistics latencyHistogram) { + this.latencyHistogram = latencyHistogram; + } + + @Override + public double getQuantile(double quantile) { + return latencyHistogram.getPercentile(quantile); --- End diff -- sorry, I will fix. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4826: [FLINK-7608][metric] Refactoring latency statistic...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145293354 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogramStatistics.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + + +/** + * Latency histogram statistics implementation returned by {@link LatencyHistogram}. + * The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly. + */ +public class LatencyHistogramStatistics extends HistogramStatistics { + private final DescriptiveStatistics latencyHistogram; + + public LatencyHistogramStatistics(DescriptiveStatistics latencyHistogram) { + this.latencyHistogram = latencyHistogram; + } + + @Override + public double getQuantile(double quantile) { + return latencyHistogram.getPercentile(quantile); --- End diff -- sorry, I will fix. ---
[GitHub] flink pull request #4826: [FLINK-7608][metric] Refactoring latency statistic...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145293108 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogram.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +/** + * The {@link LatencyHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. + */ +public class LatencyHistogram implements org.apache.flink.metrics.Histogram { + + private final DescriptiveStatistics latencyHistogram; + + public LatencyHistogram(int windowSize) { + // 512 element window (4 kb) --- End diff -- will remove. ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208656#comment-16208656 ] ASF GitHub Bot commented on FLINK-7608: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4826#discussion_r145293108 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/latency/LatencyHistogram.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.latency; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; + +/** + * The {@link LatencyHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. + */ +public class LatencyHistogram implements org.apache.flink.metrics.Histogram { + + private final DescriptiveStatistics latencyHistogram; + + public LatencyHistogram(int windowSize) { + // 512 element window (4 kb) --- End diff -- will remove. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4661: [FLINK-4831][metrics] Implement a slf4j metric reporter
Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/4661 CC @zentol ---
[jira] [Commented] (FLINK-4831) Implement a slf4j metric reporter
[ https://issues.apache.org/jira/browse/FLINK-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208652#comment-16208652 ] ASF GitHub Bot commented on FLINK-4831: --- Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/4661 CC @zentol > Implement a slf4j metric reporter > - > > Key: FLINK-4831 > URL: https://issues.apache.org/jira/browse/FLINK-4831 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Hai Zhou UTC+8 >Priority: Minor > Labels: easyfix, starter > Fix For: 1.4.0 > > > For debugging purpose it would be very useful to have a log4j metric > reporter. If you don't want to setup a metric backend you currently have to > rely on JMX, which a) works a bit differently than other reporters (for > example it doesn't extend AbstractReporter) and b) makes it a bit tricky to > analyze results as metrics are cleaned up once a job finishes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4794: [build][minor] Add missing licenses
Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/4794 @StephanEwen, I see, we need a script to automatically check the headers licenses. I will think about how to implement this script... ---
[jira] [Commented] (FLINK-7758) Fix bug Kafka09Fetcher add offset metrics
[ https://issues.apache.org/jira/browse/FLINK-7758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208645#comment-16208645 ] ASF GitHub Bot commented on FLINK-7758: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4769#discussion_r143331183 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -543,6 +543,18 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { // - Metrics -- /** +* Register offset metrics. +*/ + protected MetricGroup registerOffsetMetrics(MetricGroup metricGroup) { + if (useMetrics) { + MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); + addOffsetStateGauge(kafkaMetricGroup); + return kafkaMetricGroup; + } + return null; --- End diff -- Thanks @zentol for the suggestion. I will update the PR accordingly. > Fix bug Kafka09Fetcher add offset metrics > --- > > Key: FLINK-7758 > URL: https://issues.apache.org/jira/browse/FLINK-7758 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Metrics >Affects Versions: 1.3.2 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.4.0 > > > in Kafka09Fetcher, add _KafkaConsumer_ kafkaMetricGroup. > No judgment that the useMetrics variable is true. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4769: [FLINK-7758][kafka][hotfix] Fix bug Kafka09Fetcher...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/4769#discussion_r143331183 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -543,6 +543,18 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { // - Metrics -- /** +* Register offset metrics. +*/ + protected MetricGroup registerOffsetMetrics(MetricGroup metricGroup) { + if (useMetrics) { + MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); + addOffsetStateGauge(kafkaMetricGroup); + return kafkaMetricGroup; + } + return null; --- End diff -- Thanks @zentol for the suggestion. I will update the PR accordingly. ---
[jira] [Commented] (FLINK-7843) Improve and enhance documentation for system metrics
[ https://issues.apache.org/jira/browse/FLINK-7843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208643#comment-16208643 ] Hai Zhou UTC+8 commented on FLINK-7843: --- Hi [~Zentol], what do you think about this ? > Improve and enhance documentation for system metrics > > > Key: FLINK-7843 > URL: https://issues.apache.org/jira/browse/FLINK-7843 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Critical > Fix For: 1.4.0 > > > I think we should do the following improvements about system metrics section > in the documentation: > # Add a column that the *Type* of metric. eg. Counters, Gauges, Histograms > and Meters > # Modify the *Description* of the metric,Add unit description. eg. in bytes, > in megabytes, in nanoseconds, in milliseconds -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss
[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208634#comment-16208634 ] Vijay Srinivasaraghavan commented on FLINK-7737: I am trying to understand the changes that went it as part of the PR (https://github.com/apache/flink/pull/4781). I see few FileSystemFactory implementations (HDFS, MapR, S3Presto, S3Hadoop, LocalFS) that handles the concrete FS invocation (plus configuration/scheme). There is no hsync() API call during open() instead we call only hflush() with the assumption that data will be appropriately synced to the disk by the underlying implementation. Is my understanding right? If so, I am under the assumption that for stock HDFS, if the file system is created with SYNC_BLOCK flag option, then the blocks will be synced to the disk upon close() or else we need to invoke hsync() explicitly. With the current changes, if the TMs were to fail and recover, will the data on Hadoop DNs get synced to the disk? > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > - > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.3.2 > Environment: Dev >Reporter: Ryan Hobbs >Priority: Blocker > Fix For: 1.4.0 > > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL
[ https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208539#comment-16208539 ] ASF GitHub Bot commented on FLINK-7854: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145277609 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala --- @@ -48,4 +49,19 @@ class FlinkCalciteSqlValidator( insert: SqlInsert): RelDataType = { typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType) } + + override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = { +// Due to the improperly translation of lateral table left outer join (see CALCITE-2004), we +// need to temporarily forbid the common predicates until the problem is fixed. +// The check for join with a lateral table is actually quite tricky. +if (join.getJoinType == JoinType.LEFT && + join.getRight.toString.startsWith("TABLE(")) { // TABLE (`func`(`foo`)) AS... --- End diff -- I don't free comfortable to check the string representation here. There should be a better way, maybe recursively checking for `SqlKind.COLLECTION_TABLE`: ``` private def isCollectionTable(node: SqlNode): Boolean = { if (node.getKind == SqlKind.COLLECTION_TABLE) { true } else { node match { case c: SqlCall => c.getOperandList.asScala.filter(_ != null).exists(o => isCollectionTable(o)) case l: SqlNodeList => l.getList.asScala.filter(_ != null).exists(o => isCollectionTable(o)) case _ => false } } } ``` This method passes all tests, but I'm not 100% that it is correct and won't break for more complex queries. Maybe there's also some methods in `SqlValidatorImpl` that are helpful. > Reject lateral table outer joins with predicates in SQL > --- > > Key: FLINK-7854 > URL: https://issues.apache.org/jira/browse/FLINK-7854 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, lateral table outer joins can not be normally executed. > We should cover it up by rejecting join predicates temporarily, until the > issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL
[ https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208537#comment-16208537 ] ASF GitHub Bot commented on FLINK-7854: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145268283 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala --- @@ -75,20 +76,29 @@ class CorrelateTest extends TableTestBase { util.verifySql(sqlQuery2, expected2) } - @Test - def testLeftOuterJoin(): Unit = { + /** +* Due to the temporary restriction on lateral table outer join (see FLINK-7854), this test can +* not be normally passed now. In the future, the left local predicates should be pushed down. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithLeftLocalPredicates(): Unit = { val util = streamTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.addFunction("func1", func1) -val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE" --- End diff -- Keep the previous test and add the new test to `validation.CorrelateValidationTest` > Reject lateral table outer joins with predicates in SQL > --- > > Key: FLINK-7854 > URL: https://issues.apache.org/jira/browse/FLINK-7854 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, lateral table outer joins can not be normally executed. > We should cover it up by rejecting join predicates temporarily, until the > issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL
[ https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208540#comment-16208540 ] ASF GitHub Bot commented on FLINK-7854: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145267860 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala --- @@ -102,6 +107,23 @@ class CorrelateTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), we can +* only accept literal true as the predicate for lateral table left outer join. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- Please move this test method to `validation.CorrelateValidationTest` > Reject lateral table outer joins with predicates in SQL > --- > > Key: FLINK-7854 > URL: https://issues.apache.org/jira/browse/FLINK-7854 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, lateral table outer joins can not be normally executed. > We should cover it up by rejecting join predicates temporarily, until the > issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL
[ https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208538#comment-16208538 ] ASF GitHub Bot commented on FLINK-7854: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145268307 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala --- @@ -102,6 +112,22 @@ class CorrelateTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), we can +* only accept literal true as the predicate for lateral table left outer join. +*/ + @Test(expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- Please move to `validation.CorrelateValidationTest` > Reject lateral table outer joins with predicates in SQL > --- > > Key: FLINK-7854 > URL: https://issues.apache.org/jira/browse/FLINK-7854 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, lateral table outer joins can not be normally executed. > We should cover it up by rejecting join predicates temporarily, until the > issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4846: [FLINK-7854] [table] Reject lateral table outer jo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145277609 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala --- @@ -48,4 +49,19 @@ class FlinkCalciteSqlValidator( insert: SqlInsert): RelDataType = { typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType) } + + override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = { +// Due to the improperly translation of lateral table left outer join (see CALCITE-2004), we +// need to temporarily forbid the common predicates until the problem is fixed. +// The check for join with a lateral table is actually quite tricky. +if (join.getJoinType == JoinType.LEFT && + join.getRight.toString.startsWith("TABLE(")) { // TABLE (`func`(`foo`)) AS... --- End diff -- I don't free comfortable to check the string representation here. There should be a better way, maybe recursively checking for `SqlKind.COLLECTION_TABLE`: ``` private def isCollectionTable(node: SqlNode): Boolean = { if (node.getKind == SqlKind.COLLECTION_TABLE) { true } else { node match { case c: SqlCall => c.getOperandList.asScala.filter(_ != null).exists(o => isCollectionTable(o)) case l: SqlNodeList => l.getList.asScala.filter(_ != null).exists(o => isCollectionTable(o)) case _ => false } } } ``` This method passes all tests, but I'm not 100% that it is correct and won't break for more complex queries. Maybe there's also some methods in `SqlValidatorImpl` that are helpful. ---
[GitHub] flink pull request #4846: [FLINK-7854] [table] Reject lateral table outer jo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145268283 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala --- @@ -75,20 +76,29 @@ class CorrelateTest extends TableTestBase { util.verifySql(sqlQuery2, expected2) } - @Test - def testLeftOuterJoin(): Unit = { + /** +* Due to the temporary restriction on lateral table outer join (see FLINK-7854), this test can +* not be normally passed now. In the future, the left local predicates should be pushed down. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithLeftLocalPredicates(): Unit = { val util = streamTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.addFunction("func1", func1) -val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE" --- End diff -- Keep the previous test and add the new test to `validation.CorrelateValidationTest` ---
[GitHub] flink pull request #4846: [FLINK-7854] [table] Reject lateral table outer jo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145267860 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala --- @@ -102,6 +107,23 @@ class CorrelateTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), we can +* only accept literal true as the predicate for lateral table left outer join. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- Please move this test method to `validation.CorrelateValidationTest` ---
[GitHub] flink pull request #4846: [FLINK-7854] [table] Reject lateral table outer jo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4846#discussion_r145268307 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala --- @@ -102,6 +112,22 @@ class CorrelateTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), we can +* only accept literal true as the predicate for lateral table left outer join. +*/ + @Test(expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- Please move to `validation.CorrelateValidationTest` ---
[jira] [Commented] (FLINK-2976) Save and load checkpoints manually
[ https://issues.apache.org/jira/browse/FLINK-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208503#comment-16208503 ] ASF GitHub Bot commented on FLINK-2976: --- Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1434 [![Coverage Status](https://coveralls.io/builds/13763854/badge)](https://coveralls.io/builds/13763854) Changes Unknown when pulling **d9743343ec0c268a99c46ec7324a603506827c78 on uce:2976-savepoints** into ** on apache:master**. > Save and load checkpoints manually > -- > > Key: FLINK-2976 > URL: https://issues.apache.org/jira/browse/FLINK-2976 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > Currently, all checkpointed state is bound to a job. After the job finishes > all state is lost. In case of an HA cluster, jobs can live longer than the > cluster, but they still suffer from the same issue when they finish. > Multiple users have requested the feature to manually save a checkpoint in > order to resume from it at a later point. This is especially important for > production environments. As an example, consider upgrading your existing > production Flink program. Currently, you loose all the state of your program. > With the proposed mechanism, it will be possible to save a checkpoint, stop > and update your program, and then continue your program with the checkpoint. > The required operations can be simple: > saveCheckpoint(JobID) => checkpointID: long > loadCheckpoint(JobID, long) => void > For the initial version, I would apply the following restriction: > - The topology needs to stay the same (JobGraph parallelism, etc.) > A user can configure this behaviour via the environment like the > checkpointing interval. Furthermore, the user can trigger the save operation > via the command line at arbitrary times and load a checkpoint when submitting > a job, e.g. > bin/flink checkpoint => checkpointID: long > and > bin/flink run --loadCheckpoint JobID [latest saved checkpoint] > bin/flink run --loadCheckpoint (JobID,long) [specific saved checkpoint] > As far as I can tell, the required mechanisms are similar to the ones > implemented for JobManager high availability. We need to make sure to persist > the CompletedCheckpoint instances as a pointer to the checkpoint state and to > *not* remove saved checkpoint state. > On the client side, we need to give the job and its vertices the same IDs to > allow mapping the checkpoint state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #1434: [FLINK-2976] Allow to trigger checkpoints manually
Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1434 [![Coverage Status](https://coveralls.io/builds/13763854/badge)](https://coveralls.io/builds/13763854) Changes Unknown when pulling **d9743343ec0c268a99c46ec7324a603506827c78 on uce:2976-savepoints** into ** on apache:master**. ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260916 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- The validation has been checked before. We don't need an ITCase for this. ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145257154 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -476,20 +481,30 @@ case class Join( } else { nonEquiJoinPredicateFound = true } + // The boolean literal should be valid condition type. + case x: Literal if x.resultType == Types.BOOLEAN => case x => failValidation( s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x") } validateConditions(expression, isAndBranch = true) -if (!equiJoinPredicateFound) { - failValidation( -s"Invalid join condition: $expression. At least one equi-join predicate is " + - s"required.") -} -if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) { - failValidation( -s"Invalid join condition: $expression. Non-equality join predicates or local" + - s" predicates are not supported in outer joins.") + +// Due to CALCITE-2004, we cannot accept join predicates except literal true for TableFunction +// left outer join. +if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) { + if (!alwaysTrue) failValidation("TableFunction left outer join predicates can only be " + --- End diff -- `predicates` -> `predicate` ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208452#comment-16208452 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145262038 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { +val t = testData(env).toTable(tEnv).as('a, 'b, 'c) +val func0 = new TableFunc0 + +val result = t + .leftOuterJoin(func0('c) as ('s, 'l), 'a === 'l) + .select('c, 's, 'l) + .toAppendStream[Row] + +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = "John#19,null,null\n" + "John#22,null,null\n" + "Anna44,null,null\n" + + "nosharp,null,null" +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testLeftOuterJoinWithWhere(): Unit = { --- End diff -- `testUserDefinedTableFunctionWithScalarFunction()` covers the same case (a predicate on a table function attribute). I think we can remove this test. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208444#comment-16208444 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145259863 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala --- @@ -93,9 +98,25 @@ class CorrelateTest extends TableTestBase { "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "LEFT") ), - term("select", "c", "s") + term("select", "c", "s"), + term("where", ">(s, '')") ) util.verifyTable(result, expected) } + + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the +* join predicates can only be empty or literal true. +*/ + @Test (expected = classOf[ValidationException]) --- End diff -- There is a sub-package `validation` that contains tests that verify correct validation. Please move this test method into a new `CorrelateValidationTest` there. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208445#comment-16208445 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260567 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala --- @@ -93,9 +98,25 @@ class CorrelateTest extends TableTestBase { "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "LEFT") ), - term("select", "c", "s") + term("select", "c", "s"), + term("where", ">(s, '')") ) util.verifyTable(result, expected) } + + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the +* join predicates can only be empty or literal true. +*/ + @Test (expected = classOf[ValidationException]) --- End diff -- You can add a test for `leftOuterJoin(function(...) as 'x, true)` if you want to. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208447#comment-16208447 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260893 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -82,6 +82,42 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- The validation has been checked before. We don't need an ITCase for this. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208449#comment-16208449 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145253439 --- Diff: docs/dev/table/tableApi.md --- @@ -574,6 +574,7 @@ Table result = orders Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values. +Note: Currently the predicates for table function left outer join can only be empty or literal true. --- End diff -- -> `Currently, the predicate of a table function left outer join can only be empty or literal true.` > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208441#comment-16208441 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145253981 --- Diff: docs/dev/table/tableApi.md --- @@ -583,7 +584,8 @@ tEnv.registerFunction("split", split); // join Table orders = tableEnv.scan("Orders"); Table result = orders -.leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v"))) +.leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v")) +.where("a > 5") --- End diff -- I would not add this. It's a local predicate on the outer table and not related to the table function join. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208451#comment-16208451 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260916 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- The validation has been checked before. We don't need an ITCase for this. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145261361 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -82,6 +82,42 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env, config) +val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + +val func2 = new TableFunc2 +val result = in + .leftOuterJoin(func2('c) as ('s, 'l), 'a === 'l) + .select('c, 's, 'l) + .toDataSet[Row] +val results = result.collect() +val expected = "John#19,19,2\n" + "nosharp,null,null" +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testLeftOuterJoinWithWhere(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env, config) +val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + +val func2 = new TableFunc2 +val result = in + .leftOuterJoin(func2('c) as ('s, 'l), true) + .where('a >= 'l) // The where clause should be evaluated after the join. --- End diff -- actually, it can be evaluated before the join because it is on the outer side and not on the attributes of the table function. The `testWithFilter()` covers the interesting case (`where` and `filter` are identical). I think we can remove this test method. ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145259863 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala --- @@ -93,9 +98,25 @@ class CorrelateTest extends TableTestBase { "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "LEFT") ), - term("select", "c", "s") + term("select", "c", "s"), + term("where", ">(s, '')") ) util.verifyTable(result, expected) } + + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the +* join predicates can only be empty or literal true. +*/ + @Test (expected = classOf[ValidationException]) --- End diff -- There is a sub-package `validation` that contains tests that verify correct validation. Please move this test method into a new `CorrelateValidationTest` there. ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208448#comment-16208448 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260250 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala --- @@ -99,6 +104,21 @@ class CorrelateTest extends TableTestBase { util.verifyTable(result, expected) } + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the +* join predicates can only be empty or literal true. +*/ + @Test (expected = classOf[ValidationException]) --- End diff -- move this test method to `validate.CorrelateValidationTest`. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208443#comment-16208443 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145254351 --- Diff: docs/dev/table/tableApi.md --- @@ -695,6 +698,7 @@ val split: TableFunction[_] = new MySplitUDTF() // join val result: Table = table .leftOuterJoin(split('c) as ('s, 't, 'v)) +.where('a > 5) --- End diff -- I would not add this. It's a local predicate on the outer table and not related to the table function join. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260893 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -82,6 +82,42 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { --- End diff -- The validation has been checked before. We don't need an ITCase for this. ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208446#comment-16208446 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260098 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala --- @@ -73,13 +74,17 @@ class CorrelateTest extends TableTestBase { util.verifyTable(result2, expected2) } + /** --- End diff -- Please remove this comment. The test does not include a corresponding query. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208442#comment-16208442 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145253567 --- Diff: docs/dev/table/tableApi.md --- @@ -687,6 +689,7 @@ val result: Table = table Batch Streaming Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values. +Note: Currently the predicates for table function left outer join can only be empty or literal true. --- End diff -- -> `Currently, the predicate of a table function left outer join can only be empty or literal true.` > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145254351 --- Diff: docs/dev/table/tableApi.md --- @@ -695,6 +698,7 @@ val split: TableFunction[_] = new MySplitUDTF() // join val result: Table = table .leftOuterJoin(split('c) as ('s, 't, 'v)) +.where('a > 5) --- End diff -- I would not add this. It's a local predicate on the outer table and not related to the table function join. ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260567 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala --- @@ -93,9 +98,25 @@ class CorrelateTest extends TableTestBase { "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"), term("joinType", "LEFT") ), - term("select", "c", "s") + term("select", "c", "s"), + term("where", ">(s, '')") ) util.verifyTable(result, expected) } + + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the +* join predicates can only be empty or literal true. +*/ + @Test (expected = classOf[ValidationException]) --- End diff -- You can add a test for `leftOuterJoin(function(...) as 'x, true)` if you want to. ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208453#comment-16208453 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145261361 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -82,6 +82,42 @@ class CorrelateITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env, config) +val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + +val func2 = new TableFunc2 +val result = in + .leftOuterJoin(func2('c) as ('s, 'l), 'a === 'l) + .select('c, 's, 'l) + .toDataSet[Row] +val results = result.collect() +val expected = "John#19,19,2\n" + "nosharp,null,null" +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testLeftOuterJoinWithWhere(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env, config) +val in = testData(env).toTable(tableEnv).as('a, 'b, 'c) + +val func2 = new TableFunc2 +val result = in + .leftOuterJoin(func2('c) as ('s, 'l), true) + .where('a >= 'l) // The where clause should be evaluated after the join. --- End diff -- actually, it can be evaluated before the join because it is on the outer side and not on the attributes of the table function. The `testWithFilter()` covers the interesting case (`where` and `filter` are identical). I think we can remove this test method. > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145262038 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -82,6 +82,45 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + /** +* Due to CALCITE-2004, common join predicates are temporarily forbidden. +*/ + @Test (expected = classOf[ValidationException]) + def testLeftOuterJoinWithPredicates(): Unit = { +val t = testData(env).toTable(tEnv).as('a, 'b, 'c) +val func0 = new TableFunc0 + +val result = t + .leftOuterJoin(func0('c) as ('s, 'l), 'a === 'l) + .select('c, 's, 'l) + .toAppendStream[Row] + +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = "John#19,null,null\n" + "John#22,null,null\n" + "Anna44,null,null\n" + + "nosharp,null,null" +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testLeftOuterJoinWithWhere(): Unit = { --- End diff -- `testUserDefinedTableFunctionWithScalarFunction()` covers the same case (a predicate on a table function attribute). I think we can remove this test. ---
[jira] [Commented] (FLINK-7853) Reject table function outer joins with predicates in Table API
[ https://issues.apache.org/jira/browse/FLINK-7853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208450#comment-16208450 ] ASF GitHub Bot commented on FLINK-7853: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145257154 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -476,20 +481,30 @@ case class Join( } else { nonEquiJoinPredicateFound = true } + // The boolean literal should be valid condition type. + case x: Literal if x.resultType == Types.BOOLEAN => case x => failValidation( s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x") } validateConditions(expression, isAndBranch = true) -if (!equiJoinPredicateFound) { - failValidation( -s"Invalid join condition: $expression. At least one equi-join predicate is " + - s"required.") -} -if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) { - failValidation( -s"Invalid join condition: $expression. Non-equality join predicates or local" + - s" predicates are not supported in outer joins.") + +// Due to CALCITE-2004, we cannot accept join predicates except literal true for TableFunction +// left outer join. +if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) { + if (!alwaysTrue) failValidation("TableFunction left outer join predicates can only be " + --- End diff -- `predicates` -> `predicate` > Reject table function outer joins with predicates in Table API > -- > > Key: FLINK-7853 > URL: https://issues.apache.org/jira/browse/FLINK-7853 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Blocker > Fix For: 1.4.0 > > > Due to CALCITE-2004, the table function outer joins can not be normally > executed. We should cover it up by rejecting join predicates temporarily, > until the issue is fixed in Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260250 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala --- @@ -99,6 +104,21 @@ class CorrelateTest extends TableTestBase { util.verifyTable(result, expected) } + /** +* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the +* join predicates can only be empty or literal true. +*/ + @Test (expected = classOf[ValidationException]) --- End diff -- move this test method to `validate.CorrelateValidationTest`. ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145253439 --- Diff: docs/dev/table/tableApi.md --- @@ -574,6 +574,7 @@ Table result = orders Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values. +Note: Currently the predicates for table function left outer join can only be empty or literal true. --- End diff -- -> `Currently, the predicate of a table function left outer join can only be empty or literal true.` ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145260098 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala --- @@ -73,13 +74,17 @@ class CorrelateTest extends TableTestBase { util.verifyTable(result2, expected2) } + /** --- End diff -- Please remove this comment. The test does not include a corresponding query. ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145253567 --- Diff: docs/dev/table/tableApi.md --- @@ -687,6 +689,7 @@ val result: Table = table Batch Streaming Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values. +Note: Currently the predicates for table function left outer join can only be empty or literal true. --- End diff -- -> `Currently, the predicate of a table function left outer join can only be empty or literal true.` ---
[GitHub] flink pull request #4842: [FLINK-7853] [table] Reject table function outer j...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4842#discussion_r145253981 --- Diff: docs/dev/table/tableApi.md --- @@ -583,7 +584,8 @@ tEnv.registerFunction("split", split); // join Table orders = tableEnv.scan("Orders"); Table result = orders -.leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v"))) +.leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v")) +.where("a > 5") --- End diff -- I would not add this. It's a local predicate on the outer table and not related to the table function join. ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16208238#comment-16208238 ] Fabian Hueske commented on FLINK-7548: -- Thanks for the comments [~twalthr] and [~xccui]. @ Timo: I would not add {{TimestampExtractor}} and {{WatermarkAssigner}} to {{TableSchema}}. {{TableSchema}} is also used at different places and timestamp extractors & watermarks assigners are not relevant in these contexts. IMO, {{TimestampExtractor}} and {{WatermarkAssigner}} belong to a {{TableSource}} but not to the schema. The schema could encode that these fields are non-nullable and watermarked (or however, we want to call this property). I think the issue of defining fields twice is rather an issue of a concrete {{TableSource}} implementation but not so much of the internal API. Given the current API, it is possible to have a {{TableSource}} that requires to define a field just once (in whatever way) and generate an appropriate TableSchema and proctime field name. @ Xingcan: 1. With the current design it shouldn't be too hard to define a {{TimestampExtractor}} that parses a string field. I would not add it to the {{ExistingField}} extractor because it would need an additional parameter that specifies the formatting of the timestamp string. 2. Multiple rowtime attributes might not be possible yet but might be later. With this API we have the option to add this feature later without breaking the API. 3. It's also possible to add a {{TimestampExtractor}} that assigns the current timestamp as event timestamp, i.e., ingestion time. 4. Yes, that was my initial motivation to use RexNode instead of Table API Expressions. I was thinking that it would be easier to parse SQL expressions from a CREATE TABLE statement into RexNodes (using some Calcite code) than to Expressions. OTOH, I think it makes sense to be consistent and not expose Calcite API. I'd be happy to hear what others think about the RexNode vs. Expression choice. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7863) Make MetricFetcher work with RestfulGateway
Till Rohrmann created FLINK-7863: Summary: Make MetricFetcher work with RestfulGateway Key: FLINK-7863 URL: https://issues.apache.org/jira/browse/FLINK-7863 Project: Flink Issue Type: Sub-task Components: REST Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann In order to make the {{MetricFetcher}} work together with the new architecture, we have to remove it's dependence on the {{JobManagerGateway}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7794) Link Broken in -- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/index.html
[ https://issues.apache.org/jira/browse/FLINK-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Wu resolved FLINK-7794. Resolution: Fixed Release Note: Someone fixed the issue. > Link Broken in -- > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/index.html > --- > > Key: FLINK-7794 > URL: https://issues.apache.org/jira/browse/FLINK-7794 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.0 >Reporter: Paul Wu >Priority: Minor > > Broken url link "predefined data sources" in page > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/index.html. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7839) Creating Quickstart project for SNAPSHOT version fails
[ https://issues.apache.org/jira/browse/FLINK-7839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207807#comment-16207807 ] ASF GitHub Bot commented on FLINK-7839: --- GitHub user mcfongtw opened a pull request: https://github.com/apache/flink/pull/4848 [FLINK-7839] [doc] Add a note on possible maven failure for creating quickstart project ## What is the purpose of the change This only applies before project release; {%site.is_stable == false %}. Add an warning note on recent Maven change (3.0+) for -DarchetypeCatalog while creating quickstart project. ## Verifying this change Only doc change You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcfongtw/flink FLINK-7839 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4848 commit 499724706ac72ea2d5e4e2c1fa5655385b91a3c6 Author: Michael Fong Date: 2017-10-17T15:32:18Z [FLINK-7839] [doc] Add a note on possible maven failure for creating quickstart project. This only applies before project release; {%site.is_stable == false %}. Add an warning note on recent Maven change (3.0+) for -DarchetypeCatalog while creating quickstart project. > Creating Quickstart project for SNAPSHOT version fails > -- > > Key: FLINK-7839 > URL: https://issues.apache.org/jira/browse/FLINK-7839 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: Michael Fong >Priority: Blocker > Labels: documentation > Fix For: 1.4.0 > > > The documentation on creating quickstart projects is broken for SNAPSHOT > releases. For example, the documentation suggests to use the following > command to generate a Flink 1.4-SNAPSHOT project using maven archetypes: > {code} > mvn archetype:generate \ > -DarchetypeGroupId=org.apache.flink \ > -DarchetypeArtifactId=flink-quickstart-java \ > > -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ > \ > -DarchetypeVersion=1.4-SNAPSHOT > {code} > The command fails with the error: > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-archetype-plugin:3.0.1:generate (default-cli) > on project flink-training-exercises: archetypeCatalog > 'https://repository.apache.org/content/repositories/snapshots/' is not > supported anymore. Please read the plugin documentation for details. -> [Help > 1] > {code} > This also affects the quickstart script. > Since version 3.0.0, the archetype plugin does not allow to specify > repositories as command line arguments. See > http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4848: [FLINK-7839] [doc] Add a note on possible maven fa...
GitHub user mcfongtw opened a pull request: https://github.com/apache/flink/pull/4848 [FLINK-7839] [doc] Add a note on possible maven failure for creating quickstart project ## What is the purpose of the change This only applies before project release; {%site.is_stable == false %}. Add an warning note on recent Maven change (3.0+) for -DarchetypeCatalog while creating quickstart project. ## Verifying this change Only doc change You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcfongtw/flink FLINK-7839 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4848 commit 499724706ac72ea2d5e4e2c1fa5655385b91a3c6 Author: Michael Fong Date: 2017-10-17T15:32:18Z [FLINK-7839] [doc] Add a note on possible maven failure for creating quickstart project. This only applies before project release; {%site.is_stable == false %}. Add an warning note on recent Maven change (3.0+) for -DarchetypeCatalog while creating quickstart project. ---
[jira] [Updated] (FLINK-7849) Remove guava shading from hcatalog connector
[ https://issues.apache.org/jira/browse/FLINK-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-7849: Priority: Minor (was: Blocker) > Remove guava shading from hcatalog connector > > > Key: FLINK-7849 > URL: https://issues.apache.org/jira/browse/FLINK-7849 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > Same issue as FLINK-7846. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207782#comment-16207782 ] Xingcan Cui commented on FLINK-7548: Hi [~fhueske], sorry for the late reply. I'll first share some ideas that struck me and then keep the focus on this issue. # Now that the rowtime can be extracted by expressions, shall we consider allowing more data types for the original fields? For instance, the users may want to extract the timestamps from a {{String}} field or even from multiple fields by a UDF. # Since the stream API do not support multiple watermarks, maybe it's not necessary to define a list of {{RowtimeAttributeDescriptor}}. # Sometimes the processing time could be inapplicable, while the streams are lack of timestamps. I think it would be convenient to provide an inner mechanism, which can materialize the ingestion time as the rowtime. # Using Table API Expression sounds like a good idea. However, I just wonder whether it will make the DDL harder to implement. Best, Xingcan > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7846) Remove guava shading from ES2 connector
[ https://issues.apache.org/jira/browse/FLINK-7846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-7846: Priority: Minor (was: Blocker) > Remove guava shading from ES2 connector > --- > > Key: FLINK-7846 > URL: https://issues.apache.org/jira/browse/FLINK-7846 > Project: Flink > Issue Type: Bug > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > The ElasticSearch 2 connector pom has a shading configuration for guava. The > only user of guava is the elasticsearch dependency, which is not included in > the jar, making the shading pointless. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7783) Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
[ https://issues.apache.org/jira/browse/FLINK-7783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207783#comment-16207783 ] Aljoscha Krettek commented on FLINK-7783: - We should never delete checkpoints that we fail to retrieve but always keep them. Checkpoint deletion should only happen if/when they are subsumed by newer checkpoints. > Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() > -- > > Key: FLINK-7783 > URL: https://issues.apache.org/jira/browse/FLINK-7783 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, we always delete checkpoint handles if they (or the data from the > DFS) cannot be read: > https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180 > This can lead to problems in case the DFS is temporarily now available, i.e. > we could inadvertently > delete all checkpoints even though they are still valid. > A user reported this problem on the mailing list: > https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4847: [hotfix][doc]CEP docs review to remove weasel word...
GitHub user ChrisChinchilla opened a pull request: https://github.com/apache/flink/pull/4847 [hotfix][doc]CEP docs review to remove weasel words, fix passive voice, typos… As requested, backport from 1.4. Ref - https://github.com/apache/flink/pull/4816 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ChrisChinchilla/flink release-1.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4847.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4847 commit 9921c1b4d468d68f27094dd95345df8135c31730 Author: Chris Ward Date: 2017-10-17T14:36:48Z CEP docs review to remove weasel words, fix passive voice, typos and formatting Backport from 1.4 ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207717#comment-16207717 ] Timo Walther commented on FLINK-7548: - Thanks for the proposal [~fhueske]. I like the idea with a separate {{TableSchema}} for sources that a user can implement. Maybe we can simplify the design even more by extending {{TableSchema}} with additional methods like {{setRowtime(TimestampExtractor, WatermarkAssigner), setProctime()}}. Then, we would not need the {{DefinedProctimeAttribute}} and {{DefinedRowtimeAttributes}} interfaces anymore. If we want to change the logic in the future, we don't have to modify interfaces but can deprecate or add additional methods to {{TableSchema}}. Having {{setRowtime/setProctime()}} would also hide the internal {{TimeIndicatorType}} but would not require to define a field name twice (in TableSchema and DefinedProctimeAttribute). What do you think? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
[ https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207674#comment-16207674 ] ASF GitHub Bot commented on FLINK-6173: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4837 the table examples no longer build on travis. > flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414 > > > Key: FLINK-6173 > URL: https://issues.apache.org/jira/browse/FLINK-6173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Zhenghua Gao >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.4.0 > > > Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them > to org.apache.flink.shaded.calcite.com.fasterxml.jackson.* > If a project depends on flink-table, and uses fasterxml as follows(function > explain uses fasterxml indirectly): > {code:title=WordCount.scala|borderStyle=solid} > object WordCountWithTable { > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > val expr = input.toTable(tEnv) > val result = expr > .groupBy('word) > .select('word, 'frequency.sum as 'frequency) > .filter('frequency === 2) > println(tEnv.explain(result)) > result.toDataSet[WC].print() > } > case class WC(word: String, frequency: Long) > } > {code} > It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.* > I found after FLINK-5414, flink-table didn't pack-in com.fasterxml.jackson.* > and the project would throw class not found exception. > {code:borderStyle=solid} > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper > at > org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32) > at > org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143) > at > org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164) > at > org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34) > at > org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 10 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4837: [FLINK-6173] [table] Clean-up flink-table jar and depende...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4837 the table examples no longer build on travis. ---
[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145132472 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() + .useParallelExecutor(Runtime.getRuntime().availableProcessors()) + .addUrls(ClasspathHelper.forPackage("org.apache.flink")) + .addScanners(new MemberUsageScanner(), + new MethodAnnotationsScanner())); + + Set methods = reflections.getMethodsAnnotatedWith(VisibleForTesting.class); + + for (Method method : methods) { + Set usages = reflections.getMethodUsage(method); + for (Member member : usages) { + if (member instanceof Method) { + Method methodHopeWithTestAnnotation = (Method) member; --- End diff -- nit: maybe rename `methodHopeWithTestAnnotation` to `visibleForTestingUsageScope`? Or something else? ---
[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145126703 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() + .useParallelExecutor(Runtime.getRuntime().availableProcessors()) + .addUrls(ClasspathHelper.forPackage("org.apache.flink")) + .addScanners(new MemberUsageScanner(), + new MethodAnnotationsScanner())); + + Set methods = reflections.getMethodsAnnotatedWith(VisibleForTesting.class); + + for (Method method : methods) { + Set usages = reflections.getMethodUsage(method); + for (Member member : usages) { + if (member instanceof Method) { + Method methodHopeWithTestAnnotation = (Method) member; + if (!methodHopeWithTestAnnotation.isAnnotationPresent(Test.class)) { --- End diff -- what if the usage is in tests, but wrapped by some test class/mock? Can we check whether the usage is in `src/test` path? ---
[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145125984 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() + .useParallelExecutor(Runtime.getRuntime().availableProcessors()) + .addUrls(ClasspathHelper.forPackage("org.apache.flink")) + .addScanners(new MemberUsageScanner(), --- End diff -- if you fold arguments, please also fold the first one: ``` .addScanners( new MemberUsageScanner(), new MethodAnnotationsScanner()) ``` ---
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207657#comment-16207657 ] ASF GitHub Bot commented on FLINK-6444: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145125984 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() + .useParallelExecutor(Runtime.getRuntime().availableProcessors()) + .addUrls(ClasspathHelper.forPackage("org.apache.flink")) + .addScanners(new MemberUsageScanner(), --- End diff -- if you fold arguments, please also fold the first one: ``` .addScanners( new MemberUsageScanner(), new MethodAnnotationsScanner()) ``` > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: mingleizhang > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207655#comment-16207655 ] ASF GitHub Bot commented on FLINK-6444: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145125538 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() --- End diff -- Could you extract `ConfigurationBuilder` to a local variable? Now it is hard to read. > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: mingleizhang > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207658#comment-16207658 ] ASF GitHub Bot commented on FLINK-6444: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145132472 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() + .useParallelExecutor(Runtime.getRuntime().availableProcessors()) + .addUrls(ClasspathHelper.forPackage("org.apache.flink")) + .addScanners(new MemberUsageScanner(), + new MethodAnnotationsScanner())); + + Set methods = reflections.getMethodsAnnotatedWith(VisibleForTesting.class); + + for (Method method : methods) { + Set usages = reflections.getMethodUsage(method); + for (Member member : usages) { + if (member instanceof Method) { + Method methodHopeWithTestAnnotation = (Method) member; --- End diff -- nit: maybe rename `methodHopeWithTestAnnotation` to `visibleForTestingUsageScope`? Or something else? > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: mingleizhang > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207659#comment-16207659 ] ASF GitHub Bot commented on FLINK-6444: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145126703 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() + .useParallelExecutor(Runtime.getRuntime().availableProcessors()) + .addUrls(ClasspathHelper.forPackage("org.apache.flink")) + .addScanners(new MemberUsageScanner(), + new MethodAnnotationsScanner())); + + Set methods = reflections.getMethodsAnnotatedWith(VisibleForTesting.class); + + for (Method method : methods) { + Set usages = reflections.getMethodUsage(method); + for (Member member : usages) { + if (member instanceof Method) { + Method methodHopeWithTestAnnotation = (Method) member; + if (!methodHopeWithTestAnnotation.isAnnotationPresent(Test.class)) { --- End diff -- what if the usage is in tests, but wrapped by some test class/mock? Can we check whether the usage is in `src/test` path? > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: mingleizhang > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145133848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -439,7 +439,7 @@ public CheckpointingMode getCheckpointingMode() { * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for * the key/value state, and for checkpointed functions (implementing the interface -* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}). +* {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}). --- End diff -- > (for example implementing the interface {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed}) `CheckpointedFunction` is also deprecated ---
[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145125538 --- Diff: flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.manual; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.junit.Test; +import org.reflections.Reflections; +import org.reflections.scanners.MemberUsageScanner; +import org.reflections.scanners.MethodAnnotationsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; + +import java.lang.reflect.Member; +import java.lang.reflect.Method; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * This test check the methods are annotated with @VisibleForTesting. But still was called from the class + * which does not belong to the tests. These methods should only be called from tests. + */ +public class CheckVisibleForTestingUsage { + + @Test + public void testCheckVisibleForTesting() throws Exception { + final Reflections reflections = new Reflections(new ConfigurationBuilder() --- End diff -- Could you extract `ConfigurationBuilder` to a local variable? Now it is hard to read. ---
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207656#comment-16207656 ] ASF GitHub Bot commented on FLINK-6444: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4705#discussion_r145133848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -439,7 +439,7 @@ public CheckpointingMode getCheckpointingMode() { * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for * the key/value state, and for checkpointed functions (implementing the interface -* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}). +* {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}). --- End diff -- > (for example implementing the interface {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed}) `CheckpointedFunction` is also deprecated > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: mingleizhang > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207654#comment-16207654 ] ASF GitHub Bot commented on FLINK-7648: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4824 The handler related commits look good to me. > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4824: [FLINK-7648] [flip6] Add TaskManagersHandler
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4824 The handler related commits look good to me. ---
[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots
[ https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207634#comment-16207634 ] ASF GitHub Bot commented on FLINK-7832: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4823#discussion_r145129240 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java --- @@ -74,22 +83,51 @@ public AllocationID getAllocationId() { return allocationId; } - public void setAllocationId(AllocationID allocationId) { - this.allocationId = allocationId; - } - public PendingSlotRequest getAssignedSlotRequest() { return assignedSlotRequest; } - public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) { - this.assignedSlotRequest = assignedSlotRequest; - } - public InstanceID getInstanceId() { return taskManagerConnection.getInstanceID(); } + public void freeSlot() { + Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it."); + + state = State.FREE; + allocationId = null; + } + + public void clearPendingSlotRequest() { + Preconditions.checkState(state == State.PENDING, "No slot request to clear."); + + state = State.FREE; + assignedSlotRequest = null; + } + + public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { + Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request."); + + state = State.PENDING; + assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest); + } + + public void completeAllocation(AllocationID allocationId) { + Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated."); + Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request."); + + state = State.ALLOCATED; + this.allocationId = Preconditions.checkNotNull(allocationId); + assignedSlotRequest = null; + } + + public void updateAllocation(AllocationID allocationId) { --- End diff -- Can you explain the difference between updateAllocation and completeAllocation in terms of when they are used? (I would've expected that the slot lifecycle is along the lines of FREE <-> PENDING -> ALLOCATED -> FREE), but this doesn't appear to be the case as a free slot can be allocated without ever being in a pending state). > SlotManager should return number of registered slots > > > Key: FLINK-7832 > URL: https://issues.apache.org/jira/browse/FLINK-7832 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{SlotManager}} should provide information about the number of registered > slots for a {{TaskExecutor}} and how many of these slots are still free. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots
[ https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207636#comment-16207636 ] ASF GitHub Bot commented on FLINK-7832: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4823#discussion_r145117077 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -418,26 +415,17 @@ public void freeSlot(SlotID slotId, AllocationID allocationId) { TaskManagerSlot slot = slots.get(slotId); if (null != slot) { - if (slot.isAllocated()) { + if (slot.getState() == TaskManagerSlot.State.ALLOCATED) { if (Objects.equals(allocationId, slot.getAllocationId())) { - // free the slot - slot.setAllocationId(null); - fulfilledSlotRequests.remove(allocationId); - - if (slot.isFree()) { - handleFreeSlot(slot); - } TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); - if (null != taskManagerRegistration) { - if (anySlotUsed(taskManagerRegistration.getSlots())) { - taskManagerRegistration.markUsed(); - } else { - taskManagerRegistration.markIdle(); - } + if (taskManagerRegistration == null) { + throw new IllegalStateException("Trying to free a slot from a TaskManager " + --- End diff -- Would it be useful to also have the slot ID in the exception? > SlotManager should return number of registered slots > > > Key: FLINK-7832 > URL: https://issues.apache.org/jira/browse/FLINK-7832 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{SlotManager}} should provide information about the number of registered > slots for a {{TaskExecutor}} and how many of these slots are still free. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots
[ https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207632#comment-16207632 ] ASF GitHub Bot commented on FLINK-7832: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4823#discussion_r145119294 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -556,14 +537,30 @@ private void registerSlot( * @return True if the slot could be updated; otherwise false */ private boolean updateSlot(SlotID slotId, AllocationID allocationId) { - TaskManagerSlot slot = slots.get(slotId); + final TaskManagerSlot slot = slots.get(slotId); - if (null != slot) { - // we assume the given allocation id to be the ground truth (coming from the TM) - slot.setAllocationId(allocationId); + if (slot != null) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); + + if (taskManagerRegistration != null) { + updateSlotInternal(slot, taskManagerRegistration, allocationId); + + return true; + } else { + throw new IllegalStateException("Trying to update a slot from a TaskManager " + + slot.getInstanceId() + " which has not been registered."); + } + } else { + LOG.debug("Trying to update unknown slot with slot id {}.", slotId); - if (null != allocationId) { - if (slot.hasPendingSlotRequest()){ + return false; + } + } + + private void updateSlotInternal(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId) { --- End diff -- isn't it odd to have an `updateSlotInternal` method, when a private `updateSlot` method already exists? (more or less a naming issue) > SlotManager should return number of registered slots > > > Key: FLINK-7832 > URL: https://issues.apache.org/jira/browse/FLINK-7832 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{SlotManager}} should provide information about the number of registered > slots for a {{TaskExecutor}} and how many of these slots are still free. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots
[ https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207635#comment-16207635 ] ASF GitHub Bot commented on FLINK-7832: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4823#discussion_r145117281 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -556,14 +537,30 @@ private void registerSlot( * @return True if the slot could be updated; otherwise false */ private boolean updateSlot(SlotID slotId, AllocationID allocationId) { - TaskManagerSlot slot = slots.get(slotId); + final TaskManagerSlot slot = slots.get(slotId); - if (null != slot) { - // we assume the given allocation id to be the ground truth (coming from the TM) - slot.setAllocationId(allocationId); + if (slot != null) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); + + if (taskManagerRegistration != null) { + updateSlotInternal(slot, taskManagerRegistration, allocationId); + + return true; + } else { + throw new IllegalStateException("Trying to update a slot from a TaskManager " + --- End diff -- same as above > SlotManager should return number of registered slots > > > Key: FLINK-7832 > URL: https://issues.apache.org/jira/browse/FLINK-7832 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{SlotManager}} should provide information about the number of registered > slots for a {{TaskExecutor}} and how many of these slots are still free. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7832) SlotManager should return number of registered slots
[ https://issues.apache.org/jira/browse/FLINK-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16207633#comment-16207633 ] ASF GitHub Bot commented on FLINK-7832: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4823#discussion_r144810955 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java --- @@ -74,22 +83,51 @@ public AllocationID getAllocationId() { return allocationId; } - public void setAllocationId(AllocationID allocationId) { - this.allocationId = allocationId; - } - public PendingSlotRequest getAssignedSlotRequest() { return assignedSlotRequest; } - public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) { - this.assignedSlotRequest = assignedSlotRequest; - } - public InstanceID getInstanceId() { return taskManagerConnection.getInstanceID(); } + public void freeSlot() { + Preconditions.checkState(state == State.ALLOCATED, "Slot must be allocated before freeing it."); + + state = State.FREE; + allocationId = null; + } + + public void clearPendingSlotRequest() { + Preconditions.checkState(state == State.PENDING, "No slot request to clear."); + + state = State.FREE; + assignedSlotRequest = null; + } + + public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { + Preconditions.checkState(state == State.FREE, "Slot must be free to be assigned a slot request."); + + state = State.PENDING; + assignedSlotRequest = Preconditions.checkNotNull(pendingSlotRequest); + } + + public void completeAllocation(AllocationID allocationId) { + Preconditions.checkState(state == State.PENDING, "In order to complete an allocation, the slot has to be allocated."); + Preconditions.checkState(Objects.equals(allocationId, assignedSlotRequest.getAllocationId()), "Mismatch between allocation id of the pending slot request."); + + state = State.ALLOCATED; + this.allocationId = Preconditions.checkNotNull(allocationId); --- End diff -- The null check should be done before comparing the id to the requests allocation id. > SlotManager should return number of registered slots > > > Key: FLINK-7832 > URL: https://issues.apache.org/jira/browse/FLINK-7832 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{SlotManager}} should provide information about the number of registered > slots for a {{TaskExecutor}} and how many of these slots are still free. -- This message was sent by Atlassian JIRA (v6.4.14#64029)