[jira] [Comment Edited] (FLINK-10966) Optimize the release blocking logic in BarrierBuffer
[ https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695609#comment-16695609 ] vinoyang edited comment on FLINK-10966 at 11/22/18 6:58 AM: Hi [~zjwang] , answer your question: 1). For 1.5+, maybe you are right, but when we found this problem it was at Flink 1.3.2. My intention is that we should not just store data. In the case of a large amount of data, both memory and disk are limited by capacity. My suggestion is to release the blocking as soon as possible so that the data can participate in the calculation. 2). In the scenario I said, if the upstream blocking an input channel and the other input channels are marked as blocked, then id2 has no chance to determine the CheckpointBarrier event, so the blocking will not be released (It will go into the if branch of the code I provided). 3). The trigger mechanism of the current checkpoint is periodic. I think we should not speculate on the cause of the checkpoint timeout (may be caused by various reasons, such as back pressure or the logic of the source itself, yes, this is what we encountered) , the checkpoint trigger logic that should not change. I just suggested that after a checkpoint timeout, the CheckpointCoordinator could trigger the task to cancel the last round of blocking. was (Author: yanghua): Hi [~zjwang] , answer your question: 1). For 1.5+, maybe you are right, but when we found this problem it was at Flink 1.3.2. My intention is that we should not just store data. In the case of a large amount of data, both memory and disk are limited by capacity. My suggestion is to release the blocking as soon as possible so that the data can participate in the calculation. 2). In the scenario I said, if the upstream blocking an input channel and the other input channels are marked as blocked, then id2 has no chance to determine the CheckpointBarrier event, so the blocking will not be released (It will go into the if branch of the code I provided). 3). The trigger mechanism of the current checkpoint is periodic. I think we should not speculate on the cause of the checkpoint timeout (may be caused by various reasons, such as back pressure or the logic of the source itself, yes, this is what we encountered) , the trigger logic that should not change the checkpoint should not change. I just suggested that after a checkpoint timeout, the CheckpointCoordinator will trigger the task to cancel the last round of blocking. > Optimize the release blocking logic in BarrierBuffer > > > Key: FLINK-10966 > URL: https://issues.apache.org/jira/browse/FLINK-10966 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Issue: > Currently, mixing CancelCheckpointMarker control events with data flow to > drive task to release blocking logic in BarrierBuffer may result in blocking > logic not being released in time, further leading to a large amount of data > being spilled to disk. > The source code for this problem is as follows: > {code:java} > BufferOrEvent bufferOrEvent = next.get(); > if (isBlocked(bufferOrEvent.getChannelIndex())) { //issue line >// if the channel is blocked we, we just store the BufferOrEvent >bufferBlocker.add(bufferOrEvent); >checkSizeLimit(); > } > else if (bufferOrEvent.isBuffer()) { >return bufferOrEvent; > } > else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { >if (!endOfStream) { > // process barriers only if there is a chance of the checkpoint > completing > processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), > bufferOrEvent.getChannelIndex()); >} > } > else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) > { >processCancellationBarrier((CancelCheckpointMarker) > bufferOrEvent.getEvent()); > } > else { >if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { > processEndOfPartition(); >} >return bufferOrEvent; > } > {code} > Scenarios: > Considering a simple DAG:source->map (network shuffle), the degree of > parallelism is 10. The checkpoint semantics is exactly once. > The first checkpoint: barriers of 9 source subtask are received by all map > subtask. One of the source subtasks is blocked, resulting in the failure to > send barrier. Eventually, the checkpoint will fail due to timeout. At this > point, 9 corresponding input channel are blocked because they have received > barrier. > Second checkpoint: At this point, the special source subtask is still blocked > and cannot send any events to downstream, while the nine input channels are > still blocked. From the
[jira] [Commented] (FLINK-10966) Optimize the release blocking logic in BarrierBuffer
[ https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695609#comment-16695609 ] vinoyang commented on FLINK-10966: -- Hi [~zjwang] , answer your question: 1). For 1.5+, maybe you are right, but when we found this problem it was at Flink 1.3.2. My intention is that we should not just store data. In the case of a large amount of data, both memory and disk are limited by capacity. My suggestion is to release the blocking as soon as possible so that the data can participate in the calculation. 2). In the scenario I said, if the upstream blocking an input channel and the other input channels are marked as blocked, then id2 has no chance to determine the CheckpointBarrier event, so the blocking will not be released (It will go into the if branch of the code I provided). 3). The trigger mechanism of the current checkpoint is periodic. I think we should not speculate on the cause of the checkpoint timeout (may be caused by various reasons, such as back pressure or the logic of the source itself, yes, this is what we encountered) , the trigger logic that should not change the checkpoint should not change. I just suggested that after a checkpoint timeout, the CheckpointCoordinator will trigger the task to cancel the last round of blocking. > Optimize the release blocking logic in BarrierBuffer > > > Key: FLINK-10966 > URL: https://issues.apache.org/jira/browse/FLINK-10966 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Issue: > Currently, mixing CancelCheckpointMarker control events with data flow to > drive task to release blocking logic in BarrierBuffer may result in blocking > logic not being released in time, further leading to a large amount of data > being spilled to disk. > The source code for this problem is as follows: > {code:java} > BufferOrEvent bufferOrEvent = next.get(); > if (isBlocked(bufferOrEvent.getChannelIndex())) { //issue line >// if the channel is blocked we, we just store the BufferOrEvent >bufferBlocker.add(bufferOrEvent); >checkSizeLimit(); > } > else if (bufferOrEvent.isBuffer()) { >return bufferOrEvent; > } > else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { >if (!endOfStream) { > // process barriers only if there is a chance of the checkpoint > completing > processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), > bufferOrEvent.getChannelIndex()); >} > } > else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) > { >processCancellationBarrier((CancelCheckpointMarker) > bufferOrEvent.getEvent()); > } > else { >if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { > processEndOfPartition(); >} >return bufferOrEvent; > } > {code} > Scenarios: > Considering a simple DAG:source->map (network shuffle), the degree of > parallelism is 10. The checkpoint semantics is exactly once. > The first checkpoint: barriers of 9 source subtask are received by all map > subtask. One of the source subtasks is blocked, resulting in the failure to > send barrier. Eventually, the checkpoint will fail due to timeout. At this > point, 9 corresponding input channel are blocked because they have received > barrier. > Second checkpoint: At this point, the special source subtask is still blocked > and cannot send any events to downstream, while the nine input channels are > still blocked. From the current implementation, the data or events it > receives will not be processed, but will be stored directly. Therefore, the > barrier of the downstream task will not be released. The only hope is that > the cached data reaches the maximum limit. > I think the main problem here is that we should not store data which comes > from blocked input channels directly. Especially when one input channel is > blocked by upstream and nine input channels are marked as blocked, we may not > always be able to release the blocking. > A better mechanism might be that we send notifyCheckpointFailed callback via > CheckpointCoordinator, allowing each task to unblock itself. This mechanism > can make the release of the old checkpoint align independent of the trigger > of the new checkpoint. If the interval of the checkpoints are very long but > the timeout is very short, then the effect of the optimization will be more > obvious. > Ultimately, we want to reduce unnecessary blocking and data spill to disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10820) Simplify the RebalancePartitioner implementation
[ https://issues.apache.org/jira/browse/FLINK-10820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695585#comment-16695585 ] ASF GitHub Bot commented on FLINK-10820: zhijiangW commented on issue #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation URL: https://github.com/apache/flink/pull/7051#issuecomment-440926347 @pnowojski , I squashed the commits to address the above comments which introduce `setup` method in `ChannelSelector` interface. I rebased the commit based on FLINK-10942, so directly refer to the second commit for this pr changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Simplify the RebalancePartitioner implementation > > > Key: FLINK-10820 > URL: https://issues.apache.org/jira/browse/FLINK-10820 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > _The current {{RebalancePartitioner}} implementation seems a little hacky for > selecting a random number as the first channel index, and the following > selections based on this random index in round-robin fashion._ > _Especially for the corner case of {{numChannels = Integer.MAX_VALUE}}, it > would trigger next random index once reaching the last channel index. > Actually the random index should be selected only once at the first time._ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on issue #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation
zhijiangW commented on issue #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation URL: https://github.com/apache/flink/pull/7051#issuecomment-440926347 @pnowojski , I squashed the commits to address the above comments which introduce `setup` method in `ChannelSelector` interface. I rebased the commit based on FLINK-10942, so directly refer to the second commit for this pr changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10966) Optimize the release blocking logic in BarrierBuffer
[ https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695584#comment-16695584 ] zhijiang commented on FLINK-10966: -- I have some concerns about this issue: # In credit-based flow control mode from release-1.5, the blocked channel will cache the data in memory instead of spill to disk before. So the blocking effect seems limited. # If checkpoint id1 is timeout, checkpoint id2 is then trigged by coordinator. The tasks would release previous blocked channels when receiving one new checkpoint id, so as long as the back pressure is not very extreme, the new checkpoint id should be arrived not very delay. # The proposed PRC CancelCheckpointMarker message before triggering new checkpoint id from coordinator can indeed speed to release blocked channels of tasks in some scenarios, such as back pressure. But I just wonder is it necessary to trigger another checkpoint before previous one success? In other words, if the current checkpoint timeout, maybe the new triggered checkpoint also timeout as a result. It has not solved the essential reason of timeout. > Optimize the release blocking logic in BarrierBuffer > > > Key: FLINK-10966 > URL: https://issues.apache.org/jira/browse/FLINK-10966 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Issue: > Currently, mixing CancelCheckpointMarker control events with data flow to > drive task to release blocking logic in BarrierBuffer may result in blocking > logic not being released in time, further leading to a large amount of data > being spilled to disk. > The source code for this problem is as follows: > {code:java} > BufferOrEvent bufferOrEvent = next.get(); > if (isBlocked(bufferOrEvent.getChannelIndex())) { //issue line >// if the channel is blocked we, we just store the BufferOrEvent >bufferBlocker.add(bufferOrEvent); >checkSizeLimit(); > } > else if (bufferOrEvent.isBuffer()) { >return bufferOrEvent; > } > else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { >if (!endOfStream) { > // process barriers only if there is a chance of the checkpoint > completing > processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), > bufferOrEvent.getChannelIndex()); >} > } > else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) > { >processCancellationBarrier((CancelCheckpointMarker) > bufferOrEvent.getEvent()); > } > else { >if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { > processEndOfPartition(); >} >return bufferOrEvent; > } > {code} > Scenarios: > Considering a simple DAG:source->map (network shuffle), the degree of > parallelism is 10. The checkpoint semantics is exactly once. > The first checkpoint: barriers of 9 source subtask are received by all map > subtask. One of the source subtasks is blocked, resulting in the failure to > send barrier. Eventually, the checkpoint will fail due to timeout. At this > point, 9 corresponding input channel are blocked because they have received > barrier. > Second checkpoint: At this point, the special source subtask is still blocked > and cannot send any events to downstream, while the nine input channels are > still blocked. From the current implementation, the data or events it > receives will not be processed, but will be stored directly. Therefore, the > barrier of the downstream task will not be released. The only hope is that > the cached data reaches the maximum limit. > I think the main problem here is that we should not store data which comes > from blocked input channels directly. Especially when one input channel is > blocked by upstream and nine input channels are marked as blocked, we may not > always be able to release the blocking. > A better mechanism might be that we send notifyCheckpointFailed callback via > CheckpointCoordinator, allowing each task to unblock itself. This mechanism > can make the release of the old checkpoint align independent of the trigger > of the new checkpoint. If the interval of the checkpoints are very long but > the timeout is very short, then the effect of the optimization will be more > obvious. > Ultimately, we want to reduce unnecessary blocking and data spill to disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10926) Fix the problem for function TIMESTAMPDIFF in Table
[ https://issues.apache.org/jira/browse/FLINK-10926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuqianjin closed FLINK-10926. - Resolution: Fixed Fix Version/s: 1.7.0 > Fix the problem for function TIMESTAMPDIFF in Table > --- > > Key: FLINK-10926 > URL: https://issues.apache.org/jira/browse/FLINK-10926 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.2 >Reporter: xuqianjin >Priority: Minor > Fix For: 1.7.0 > > Attachments: image-2018-11-19-18-33-47-389.png, > image-2018-11-19-22-23-09-554.png > > > Use the following SQL statement: > val result3 = tEnv.sqlQuery("select TIMESTAMPDIFF(MINUTE,'2012-08-24 > 09:00:00','2012-08-30 12:00:00')") > The following errors occurred: > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 8 to line 1, column 72: No match found for function signature > TIMESTAMPDIFF(, , ) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Expect to be able to return the time difference correctly > > val result3 = tEnv.sqlQuery("select TIMESTAMPDIFF (MINUTE, TIMESTAMP > '2012-08-24 09:00:00', TIMESTAMP '2012-08-30 12:00:00')") > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 8 to line 1, column 95: No match found for function signature > TIMESTAMPDIFF(, , ) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface
sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235608590 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: @dawidwys Thank you for the reminder!Can you explain more about `condition() == null`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695574#comment-16695574 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235608590 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: @dawidwys Thank you for the reminder!Can you explain more about `condition() == null`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10983) NonHAQueryableStateFsBackendITCase failed when run mvn install
[ https://issues.apache.org/jira/browse/FLINK-10983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-10983: Component/s: Queryable State > NonHAQueryableStateFsBackendITCase failed when run mvn install > -- > > Key: FLINK-10983 > URL: https://issues.apache.org/jira/browse/FLINK-10983 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.7.0 >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > I try to test 1.7.0-rc2 and found the following error. I think we should > harden this test by increase the port range. > Since the 1.7 release is on the corner, I will try to fix it ASAP. > {code:java} > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.423 sec <<< > FAILURE! - in > org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase > org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase > Time elapsed: 3.423 sec <<< ERROR! > java.io.IOException: Failed to start the Queryable State Data Server. > at > org.apache.flink.runtime.io.network.NetworkEnvironment.start(NetworkEnvironment.java:348) > at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:231) > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:365) > at > org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:801) > at > org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:325) > at > org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:140) > at > org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:84) > at > org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:51) > at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Caused by: org.apache.flink.util.FlinkRuntimeException: Unable to start > Queryable State Server. All ports in provided range are occupied. > at > org.apache.flink.queryablestate.network.AbstractServerBase.start(AbstractServerBase.java:198) > at > org.apache.flink.queryablestate.server.KvStateServerImpl.start(KvStateServerImpl.java:95) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.start(NetworkEnvironment.java:344) > at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:231) > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:365) > at > org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:801) > at > org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:325) > at > org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:140) > at > org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:84) > at > org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:51) > at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefi
[jira] [Created] (FLINK-10983) NonHAQueryableStateFsBackendITCase failed when run mvn install
Hequn Cheng created FLINK-10983: --- Summary: NonHAQueryableStateFsBackendITCase failed when run mvn install Key: FLINK-10983 URL: https://issues.apache.org/jira/browse/FLINK-10983 Project: Flink Issue Type: Bug Affects Versions: 1.7.0 Reporter: Hequn Cheng Assignee: Hequn Cheng I try to test 1.7.0-rc2 and found the following error. I think we should harden this test by increase the port range. Since the 1.7 release is on the corner, I will try to fix it ASAP. {code:java} Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.423 sec <<< FAILURE! - in org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase Time elapsed: 3.423 sec <<< ERROR! java.io.IOException: Failed to start the Queryable State Data Server. at org.apache.flink.runtime.io.network.NetworkEnvironment.start(NetworkEnvironment.java:348) at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:231) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:365) at org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:801) at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:325) at org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:140) at org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:84) at org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:51) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Caused by: org.apache.flink.util.FlinkRuntimeException: Unable to start Queryable State Server. All ports in provided range are occupied. at org.apache.flink.queryablestate.network.AbstractServerBase.start(AbstractServerBase.java:198) at org.apache.flink.queryablestate.server.KvStateServerImpl.start(KvStateServerImpl.java:95) at org.apache.flink.runtime.io.network.NetworkEnvironment.start(NetworkEnvironment.java:344) at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:231) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:365) at org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:801) at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:325) at org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:140) at org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:84) at org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:51) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10980) Wrong statement in CEP Scala part
[ https://issues.apache.org/jira/browse/FLINK-10980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10980: --- Labels: pull-request-available (was: ) > Wrong statement in CEP Scala part > - > > Key: FLINK-10980 > URL: https://issues.apache.org/jira/browse/FLINK-10980 > Project: Flink > Issue Type: Bug > Components: CEP, Documentation >Affects Versions: 1.6.2 >Reporter: DuBin >Priority: Minor > Labels: pull-request-available > > The CEP 'Selecting from patterns' part of Doc gives wrong code. > {{ val startEvent = pattern.get("start").get.next }} > {{val endEvent = pattern.get("end").get.next}} > {{should be .get.head}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10980) Wrong statement in CEP Scala part
[ https://issues.apache.org/jira/browse/FLINK-10980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695551#comment-16695551 ] ASF GitHub Bot commented on FLINK-10980: dubin555 opened a new pull request #7158: [FLINK-10980][docs] Fix Iterable Scala code in CEP part URL: https://github.com/apache/flink/pull/7158 ## What is the purpose of the change This pull request fix some Iterable Scala code in the CEP part of the doc ## Brief change log - *Change the Iterable[T].get.next to Iterable[T].get.head* because Iterable[T].get.next cannot compile. ## Verifying this change This change is a docs cleanup. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Wrong statement in CEP Scala part > - > > Key: FLINK-10980 > URL: https://issues.apache.org/jira/browse/FLINK-10980 > Project: Flink > Issue Type: Bug > Components: CEP, Documentation >Affects Versions: 1.6.2 >Reporter: DuBin >Priority: Minor > Labels: pull-request-available > > The CEP 'Selecting from patterns' part of Doc gives wrong code. > {{ val startEvent = pattern.get("start").get.next }} > {{val endEvent = pattern.get("end").get.next}} > {{should be .get.head}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dubin555 opened a new pull request #7158: [FLINK-10980][docs] Fix Iterable Scala code in CEP part
dubin555 opened a new pull request #7158: [FLINK-10980][docs] Fix Iterable Scala code in CEP part URL: https://github.com/apache/flink/pull/7158 ## What is the purpose of the change This pull request fix some Iterable Scala code in the CEP part of the doc ## Brief change log - *Change the Iterable[T].get.next to Iterable[T].get.head* because Iterable[T].get.next cannot compile. ## Verifying this change This change is a docs cleanup. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10982) Test DataStream to Table for Flink 1.6.2
YUZHOU HONG created FLINK-10982: --- Summary: Test DataStream to Table for Flink 1.6.2 Key: FLINK-10982 URL: https://issues.apache.org/jira/browse/FLINK-10982 Project: Flink Issue Type: Test Components: Table API & SQL Affects Versions: 1.6.2 Environment: jdk1.8, flink1.6, macOS 10.13 Reporter: YUZHOU HONG I am a newcomer for Flink Table API & SQL. When I reference official doc to test a demo that converts two DataStream into a Table, then union all them, the system reports an exception called "Exception in thread "main" java.lang.NoSuchFieldError: DOT". By the way: The part of the official doc I read is [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html] here are my codes and exception stack trace. {code:java} import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class StreamToSql { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStream> stream1 = env.fromElements(new Tuple2<>(1, "hello")); DataStream> stream2 = env.fromElements(new Tuple2<>(1, "hello")); // stream1.print(); Table table1 = tEnv.fromDataStream(stream1, "count, word"); Table table2 = tEnv.fromDataStream(stream2, "count, word"); Table table = table1 .where("LIKE(word, 'F%')") .unionAll(table2); DataStream res = tEnv.toAppendStream(table, Row.class); res.print(); env.execute("StreamToSql"); } } {code} {code:java} SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/yizhou/.m2/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/yizhou/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/yizhou/.m2/repository/ch/qos/logback/logback-classic/1.1.3/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Exception in thread "main" java.lang.NoSuchFieldError: DOT at org.apache.flink.table.validate.BasicOperatorTable.(FunctionCatalog.scala:316) at org.apache.flink.table.validate.FunctionCatalog.getSqlOperatorTable(FunctionCatalog.scala:58) at org.apache.flink.table.api.TableEnvironment.getSqlOperatorTable(TableEnvironment.scala:129) at org.apache.flink.table.api.TableEnvironment.frameworkConfig$lzycompute(TableEnvironment.scala:92) at org.apache.flink.table.api.TableEnvironment.frameworkConfig(TableEnvironment.scala:86) at org.apache.flink.table.api.TableEnvironment.relBuilder$lzycompute(TableEnvironment.scala:98) at org.apache.flink.table.api.TableEnvironment.relBuilder(TableEnvironment.scala:98) at org.apache.flink.table.api.TableEnvironment.typeFactory$lzycompute(TableEnvironment.scala:103) at org.apache.flink.table.api.TableEnvironment.typeFactory(TableEnvironment.scala:103) at org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:564) at org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:519) at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:89) at realtime.stream.StreamToSql.main(StreamToSql.java:25) Process finished with exit code 1 {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10981) Add or modify metrics to show the maximum usage of InputBufferPool/OutputBufferPool to help debugging back pressure
Yun Gao created FLINK-10981: --- Summary: Add or modify metrics to show the maximum usage of InputBufferPool/OutputBufferPool to help debugging back pressure Key: FLINK-10981 URL: https://issues.apache.org/jira/browse/FLINK-10981 Project: Flink Issue Type: Improvement Components: Metrics, Network Reporter: Yun Gao Assignee: Yun Gao Currently the network layer has provided two metrics items, namely _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge_ to show the usage of input buffer pool and output buffer pool. When there are multiple inputs(SingleInputGate) or __ outputs(ResultPartition), the two metrics items show their average usage. However, we found that the maximum usage of all the InputBufferPool or OutputBufferPool is also useful in debugging back pressure. Suppose we have a job with the following job graph: {code:java} F \ \ _\/ A ---> B > C ---> D \ \ \-> E {code} Besides, also suppose D is very slow and thus cause back pressure, but E is very fast and F outputs few records, thus the usage of the corresponding input/output buffer pool is almost 0. Then the average input/output buffer usage of each task will be: {code:java} A(100%) --> (100%) B (50%) --> (50%) C (100%) --> (100%) D {code} But the maximum input/output buffer usage of each task will be: {code:java} A(100%) --> (100%) B (100%) --> (100%) C (100%) --> (100%) D {code} Users will be able to find the slowest task by finding the first task whose input buffer usage is 100% but output usage is less than 100%. If it is reasonable to show the maximum input/output buffer usage, I think there may be three options: # Modify the current computation logic of _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge._ # Add two __ new metrics items I_nputBufferPoolMaxUsageGauge and OutputBufferPoolUsageGauge._ # Try to show distinct usage for each input/output buffer pool. and I think maybe the second option is the most preferred. How do you think about that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10981) Add or modify metrics to show the maximum usage of InputBufferPool/OutputBufferPool to help debugging back pressure
[ https://issues.apache.org/jira/browse/FLINK-10981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-10981: Description: Currently the network layer has provided two metrics items, namely _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge_ to show the usage of input buffer pool and output buffer pool. When there are multiple inputs(SingleInputGate) or outputs(ResultPartition), the two metrics items show their average usage. However, we found that the maximum usage of all the InputBufferPool or OutputBufferPool is also useful in debugging back pressure. Suppose we have a job with the following job graph: {code:java} F \ \ _\/ A ---> B > C ---> D \ \ \-> E {code} Besides, also suppose D is very slow and thus cause back pressure, but E is very fast and F outputs few records, thus the usage of the corresponding input/output buffer pool is almost 0. Then the average input/output buffer usage of each task will be: {code:java} A(100%) --> (100%) B (50%) --> (50%) C (100%) --> (100%) D {code} But the maximum input/output buffer usage of each task will be: {code:java} A(100%) --> (100%) B (100%) --> (100%) C (100%) --> (100%) D {code} Users will be able to find the slowest task by finding the first task whose input buffer usage is 100% but output usage is less than 100%. If it is reasonable to show the maximum input/output buffer usage, I think there may be three options: # Modify the current computation logic of _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge._ # Add two _new metrics items InputBufferPoolMaxUsageGauge and OutputBufferPoolUsageGauge._ # Try to show distinct usage for each input/output buffer pool. and I think maybe the second option is the most preferred. How do you think about that? was: Currently the network layer has provided two metrics items, namely _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge_ to show the usage of input buffer pool and output buffer pool. When there are multiple inputs(SingleInputGate) or __ outputs(ResultPartition), the two metrics items show their average usage. However, we found that the maximum usage of all the InputBufferPool or OutputBufferPool is also useful in debugging back pressure. Suppose we have a job with the following job graph: {code:java} F \ \ _\/ A ---> B > C ---> D \ \ \-> E {code} Besides, also suppose D is very slow and thus cause back pressure, but E is very fast and F outputs few records, thus the usage of the corresponding input/output buffer pool is almost 0. Then the average input/output buffer usage of each task will be: {code:java} A(100%) --> (100%) B (50%) --> (50%) C (100%) --> (100%) D {code} But the maximum input/output buffer usage of each task will be: {code:java} A(100%) --> (100%) B (100%) --> (100%) C (100%) --> (100%) D {code} Users will be able to find the slowest task by finding the first task whose input buffer usage is 100% but output usage is less than 100%. If it is reasonable to show the maximum input/output buffer usage, I think there may be three options: # Modify the current computation logic of _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge._ # Add two __ new metrics items I_nputBufferPoolMaxUsageGauge and OutputBufferPoolUsageGauge._ # Try to show distinct usage for each input/output buffer pool. and I think maybe the second option is the most preferred. How do you think about that? > Add or modify metrics to show the maximum usage of > InputBufferPool/OutputBufferPool to help debugging back pressure > --- > > Key: FLINK-10981 > URL: https://issues.apache.org/jira/browse/FLINK-10981 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > Currently the network layer has provided two metrics items, namely > _InputBufferPoolUsageGauge_ and _OutputBufferPoolUsageGauge_ to show the > usage of input buffer pool and output buffer pool. When there are multiple > inputs(SingleInputGate) or outputs(ResultPartition), the two metrics items > show their average usage. > > However, we found that the maximum usage of all the InputBufferPool or > OutputBufferPool is also useful in debugging back pressure. Suppose we have a > job with the following job graph: > > {code:java} > F >\ > \ > _\/ > A ---> B > C ---> D >\ > \ > \
[jira] [Comment Edited] (FLINK-9870) Support field mapping and time attributes for table sinks
[ https://issues.apache.org/jira/browse/FLINK-9870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695521#comment-16695521 ] Jun Zhang edited comment on FLINK-9870 at 11/22/18 3:39 AM: [~twalthr] I assume time attributes are not relevant to table sinks and only make sense when registering table source, why these interfaces need to be implemented by table sinks? In our case, some tables should be registered as both source and sink. When they are used as source, time attributes are necessary, but when used as sink time attributes are just ignored. was (Author: junz): [~twalthr] I assume time attributes are not relevant to table sinks and only make sense when registering table source, why these interfaces need to be implemented by table sinks? > Support field mapping and time attributes for table sinks > - > > Key: FLINK-9870 > URL: https://issues.apache.org/jira/browse/FLINK-9870 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Fix For: 1.8.0 > > > FLINK-7548 reworked the table source design and implemented the interfaces > {{DefinedFieldMapping}}, {{DefinedProctimeAttribute}}, and > {{DefinedRowtimeAttributes}}. > However, these interfaces need to be implemented by table sinks as well in > order to map a table back into a sink similar how source do it for reading > input data. > The current unified sink design assumes that this is possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9870) Support field mapping and time attributes for table sinks
[ https://issues.apache.org/jira/browse/FLINK-9870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695521#comment-16695521 ] Jun Zhang commented on FLINK-9870: -- [~twalthr] I assume time attributes are not relevant to table sinks and only make sense when registering table source, why these interfaces need to be implemented by table sinks? > Support field mapping and time attributes for table sinks > - > > Key: FLINK-9870 > URL: https://issues.apache.org/jira/browse/FLINK-9870 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Fix For: 1.8.0 > > > FLINK-7548 reworked the table source design and implemented the interfaces > {{DefinedFieldMapping}}, {{DefinedProctimeAttribute}}, and > {{DefinedRowtimeAttributes}}. > However, these interfaces need to be implemented by table sinks as well in > order to map a table back into a sink similar how source do it for reading > input data. > The current unified sink design assumes that this is possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10980) Wrong statement in CEP Scala part
DuBin created FLINK-10980: - Summary: Wrong statement in CEP Scala part Key: FLINK-10980 URL: https://issues.apache.org/jira/browse/FLINK-10980 Project: Flink Issue Type: Bug Components: CEP, Documentation Affects Versions: 1.6.2 Reporter: DuBin The CEP 'Selecting from patterns' part of Doc gives wrong code. {{ val startEvent = pattern.get("start").get.next }} {{val endEvent = pattern.get("end").get.next}} {{should be .get.head}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10834) TableAPI flatten() calculated value error
[ https://issues.apache.org/jira/browse/FLINK-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10834: Issue Type: Sub-task (was: Bug) Parent: FLINK-10972 > TableAPI flatten() calculated value error > - > > Key: FLINK-10834 > URL: https://issues.apache.org/jira/browse/FLINK-10834 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.7.1 > > > We have a UDF as follows: > {code:java} > object FuncRow extends ScalarFunction { > def eval(v: Int): Row = { > val version = "" + new Random().nextInt() > val row = new Row(3) > row.setField(0, version) > row.setField(1, version) > row.setField(2, version) > row > } > override def isDeterministic: Boolean = false > override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = > Types.ROW(Types.STRING, Types.STRING, Types.STRING) > } > {code} > Do the following Query: > {code:sql} > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b,'c) > .select(FuncRow('a).flatten()).as('v1, 'v2, 'v3) > {code} > The result is : -1189206469,-151367792,1988676906 > The result expected by the user should be: v1==v2==v3 . > It looks the real reason is that there is no result of the reuse in codegen. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10977) Add UnBounded FlatAggregate operator to streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-10977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10977: --- Assignee: Hequn Cheng > Add UnBounded FlatAggregate operator to streaming Table API > --- > > Key: FLINK-10977 > URL: https://issues.apache.org/jira/browse/FLINK-10977 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.8.0 > > > Add FlatAggregate operator to streaming Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global table aggregates > .flatAgg(fun: TableAggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10978) Add UnBounded FlatAggregate operator to batch Table API
sunjincheng created FLINK-10978: --- Summary: Add UnBounded FlatAggregate operator to batch Table API Key: FLINK-10978 URL: https://issues.apache.org/jira/browse/FLINK-10978 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: sunjincheng Fix For: 1.8.0 Add FlatAggregate operator to batch Table API as described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] . The usage: {code:java} val res = tab .groupBy('a) // leave out groupBy-clause to define global table aggregates .flatAgg(fun: TableAggregateFunction) // output has columns 'a, 'b, 'c .select('a, 'c) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10979) Add support for group keys in Unbounded Aggregate/FlatAggregate operator
sunjincheng created FLINK-10979: --- Summary: Add support for group keys in Unbounded Aggregate/FlatAggregate operator Key: FLINK-10979 URL: https://issues.apache.org/jira/browse/FLINK-10979 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: sunjincheng Fix For: 1.8.0 Add support for group keys in Aggregate/FlatAggregate operator, the detail will be described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10978) Add UnBounded FlatAggregate operator to batch Table API
[ https://issues.apache.org/jira/browse/FLINK-10978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10978: --- Assignee: Hequn Cheng > Add UnBounded FlatAggregate operator to batch Table API > --- > > Key: FLINK-10978 > URL: https://issues.apache.org/jira/browse/FLINK-10978 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.8.0 > > > Add FlatAggregate operator to batch Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] > . > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global table aggregates > .flatAgg(fun: TableAggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10977) Add UnBounded FlatAggregate operator to streaming Table API
sunjincheng created FLINK-10977: --- Summary: Add UnBounded FlatAggregate operator to streaming Table API Key: FLINK-10977 URL: https://issues.apache.org/jira/browse/FLINK-10977 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: sunjincheng Fix For: 1.8.0 Add FlatAggregate operator to streaming Table API as described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. The usage: {code:java} val res = tab .groupBy('a) // leave out groupBy-clause to define global table aggregates .flatAgg(fun: TableAggregateFunction) // output has columns 'a, 'b, 'c .select('a, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-10976: --- Assignee: Dian Fu (was: Hequn Cheng) > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10976) Add Aggregate operator to Table API
sunjincheng created FLINK-10976: --- Summary: Add Aggregate operator to Table API Key: FLINK-10976 URL: https://issues.apache.org/jira/browse/FLINK-10976 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: sunjincheng Fix For: 1.8.0 Add Aggregate operator to Table API as described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. The usage: {code:java} val res = tab .groupBy('a) // leave out groupBy-clause to define global aggregates .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c .select('a, 'c) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10976: --- Assignee: Hequn Cheng > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10974) Add FlatMap to TableAPI
[ https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-10974: --- Assignee: Dian Fu > Add FlatMap to TableAPI > --- > > Key: FLINK-10974 > URL: https://issues.apache.org/jira/browse/FLINK-10974 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Fix For: 1.8.0 > > > Add FlatMap operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab >.flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10975) Add support for TimeAttribute in Map/FlatMap operator
sunjincheng created FLINK-10975: --- Summary: Add support for TimeAttribute in Map/FlatMap operator Key: FLINK-10975 URL: https://issues.apache.org/jira/browse/FLINK-10975 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: sunjincheng Assignee: sunjincheng Fix For: 1.8.0 Add support for TimeAttribute in Map/Flatmap operator,the detail will be described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-10972: Affects Version/s: (was: 1.8.0) > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 1.8.0 > > > With the continuous efforts from the community, the Flink system has been > continuously improved, which has attracted more and more users. Flink SQL is > a canonical, widely used relational query language. However, there are still > some scenarios where Flink SQL failed to meet user needs in terms of > functionality and ease of use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to accomplish the same function., with a > map() function returning 100 columns, one has to define or call 100 UDFs when > using SQL, which is quite involved. > * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be > implemented with “table.join(udtf).select()”. However, it is obvious that > datastream is easier to use than SQL. > Due to the above two reasons, In this JIRAs group, we will enhance the > TableAPI in stages. > --- > The first state we seek to support (will describe the details in the sub > issue) : > * Table.map() > * Table.flatMap() > * GroupedTable.aggregate() > * GroupedTable.flatAggregate() > The design document and the discussion mail list can be find here: > Google doc: > [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] > [DISCUSS] Enhancing the functionality and productivity of Table API > [https://lists.apache.org/thread.html/881b34fe79991870c099132b4723dde882cffcfff8e9a1f5bbe92bee@%3Cdev.flink.apache.org%3E] > [DISCUSS] Table API Enhancement Outline: > [https://lists.apache.org/thread.html/a75f5d0a938333503a0f1881f800d37ba0ec662b44624d4be9c6fdd9@%3Cdev.flink.apache.org%3E] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10973) Add Map operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-10973: Affects Version/s: (was: 1.8.0) > Add Map operator to Table API > - > > Key: FLINK-10973 > URL: https://issues.apache.org/jira/browse/FLINK-10973 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Fix For: 1.8.0 > > > Add Map operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] > The usage: > {code:java} > val res = tab >.map(fun: ScalarFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-10972: Fix Version/s: 1.8.0 > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 1.8.0 > > > With the continuous efforts from the community, the Flink system has been > continuously improved, which has attracted more and more users. Flink SQL is > a canonical, widely used relational query language. However, there are still > some scenarios where Flink SQL failed to meet user needs in terms of > functionality and ease of use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to accomplish the same function., with a > map() function returning 100 columns, one has to define or call 100 UDFs when > using SQL, which is quite involved. > * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be > implemented with “table.join(udtf).select()”. However, it is obvious that > datastream is easier to use than SQL. > Due to the above two reasons, In this JIRAs group, we will enhance the > TableAPI in stages. > --- > The first state we seek to support (will describe the details in the sub > issue) : > * Table.map() > * Table.flatMap() > * GroupedTable.aggregate() > * GroupedTable.flatAggregate() > The design document and the discussion mail list can be find here: > Google doc: > [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] > [DISCUSS] Enhancing the functionality and productivity of Table API > [https://lists.apache.org/thread.html/881b34fe79991870c099132b4723dde882cffcfff8e9a1f5bbe92bee@%3Cdev.flink.apache.org%3E] > [DISCUSS] Table API Enhancement Outline: > [https://lists.apache.org/thread.html/a75f5d0a938333503a0f1881f800d37ba0ec662b44624d4be9c6fdd9@%3Cdev.flink.apache.org%3E] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10973) Add Map operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-10973: Fix Version/s: 1.8.0 > Add Map operator to Table API > - > > Key: FLINK-10973 > URL: https://issues.apache.org/jira/browse/FLINK-10973 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Fix For: 1.8.0 > > > Add Map operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] > The usage: > {code:java} > val res = tab >.map(fun: ScalarFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10974) Add FlatMap to TableAPI
sunjincheng created FLINK-10974: --- Summary: Add FlatMap to TableAPI Key: FLINK-10974 URL: https://issues.apache.org/jira/browse/FLINK-10974 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: sunjincheng Fix For: 1.8.0 Add FlatMap operator to Table API as described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. The usage: {code:java} val res = tab .flatMap(fun: TableFunction) // output has columns 'a, 'b, 'c .select('a, 'c) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-10972: Affects Version/s: (was: 1.7.1) 1.8.0 > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > With the continuous efforts from the community, the Flink system has been > continuously improved, which has attracted more and more users. Flink SQL is > a canonical, widely used relational query language. However, there are still > some scenarios where Flink SQL failed to meet user needs in terms of > functionality and ease of use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to accomplish the same function., with a > map() function returning 100 columns, one has to define or call 100 UDFs when > using SQL, which is quite involved. > * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be > implemented with “table.join(udtf).select()”. However, it is obvious that > datastream is easier to use than SQL. > Due to the above two reasons, In this JIRAs group, we will enhance the > TableAPI in stages. > --- > The first state we seek to support (will describe the details in the sub > issue) : > * Table.map() > * Table.flatMap() > * GroupedTable.aggregate() > * GroupedTable.flatAggregate() > The design document and the discussion mail list can be find here: > Google doc: > [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] > [DISCUSS] Enhancing the functionality and productivity of Table API > [https://lists.apache.org/thread.html/881b34fe79991870c099132b4723dde882cffcfff8e9a1f5bbe92bee@%3Cdev.flink.apache.org%3E] > [DISCUSS] Table API Enhancement Outline: > [https://lists.apache.org/thread.html/a75f5d0a938333503a0f1881f800d37ba0ec662b44624d4be9c6fdd9@%3Cdev.flink.apache.org%3E] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10973) Add Map operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-10973: --- Assignee: Dian Fu > Add Map operator to Table API > - > > Key: FLINK-10973 > URL: https://issues.apache.org/jira/browse/FLINK-10973 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.8.0 >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > > Add Map operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] > The usage: > {code:java} > val res = tab >.map(fun: ScalarFunction) // output has columns 'a, 'b, 'c >.select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10973) Add Map operator to Table API
sunjincheng created FLINK-10973: --- Summary: Add Map operator to Table API Key: FLINK-10973 URL: https://issues.apache.org/jira/browse/FLINK-10973 Project: Flink Issue Type: Sub-task Components: Table API & SQL Affects Versions: 1.8.0 Reporter: sunjincheng Add Map operator to Table API as described in [Google doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr] The usage: {code:java} val res = tab .map(fun: ScalarFunction) // output has columns 'a, 'b, 'c .select('a, 'c) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10964) sql-client throws exception when paging through finished batch query
[ https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seth Wiesman updated FLINK-10964: - Summary: sql-client throws exception when paging through finished batch query (was: sql-client throws exception when pagin through finished batch query ) > sql-client throws exception when paging through finished batch query > - > > Key: FLINK-10964 > URL: https://issues.apache.org/jira/browse/FLINK-10964 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: Seth Wiesman >Assignee: vinoyang >Priority: Major > > When paging through a batch query in state 'Finished' the sql client throws > the following exception: > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a > result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10972: Description: With the continuous efforts from the community, the Flink system has been continuously improved, which has attracted more and more users. Flink SQL is a canonical, widely used relational query language. However, there are still some scenarios where Flink SQL failed to meet user needs in terms of functionality and ease of use, such as: * In terms of functionality Iteration, user-defined window, user-defined join, user-defined GroupReduce, etc. Users cannot express them with SQL; * In terms of ease of use * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), udf2(), udf3())” can be used to accomplish the same function., with a map() function returning 100 columns, one has to define or call 100 UDFs when using SQL, which is quite involved. * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be implemented with “table.join(udtf).select()”. However, it is obvious that datastream is easier to use than SQL. Due to the above two reasons, In this JIRAs group, we will enhance the TableAPI in stages. --- The first state we seek to support (will describe the details in the sub issue) : * Table.map() * Table.flatMap() * GroupedTable.aggregate() * GroupedTable.flatAggregate() The design document and the discussion mail list can be find here: Google doc: [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] [DISCUSS] Enhancing the functionality and productivity of Table API [https://lists.apache.org/thread.html/881b34fe79991870c099132b4723dde882cffcfff8e9a1f5bbe92bee@%3Cdev.flink.apache.org%3E] [DISCUSS] Table API Enhancement Outline: [https://lists.apache.org/thread.html/a75f5d0a938333503a0f1881f800d37ba0ec662b44624d4be9c6fdd9@%3Cdev.flink.apache.org%3E] was: With the continuous efforts from the community, the Flink system has been continuously improved, which has attracted more and more users. Flink SQL is a canonical, widely used relational query language. However, there are still some scenarios where Flink SQL failed to meet user needs in terms of functionality and ease of use, such as: * In terms of functionality Iteration, user-defined window, user-defined join, user-defined GroupReduce, etc. Users cannot express them with SQL; * In terms of ease of use * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), udf2(), udf3())” can be used to accomplish the same function., with a map() function returning 100 columns, one has to define or call 100 UDFs when using SQL, which is quite involved. * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be implemented with “table.join(udtf).select()”. However, it is obvious that datastream is easier to use than SQL. Due to the above two reasons, In this JIRAs group, we will enhance the TableAPI in stages. --- The first state we seek to support (will describe the details in the sub issue) : * Table.map() * Table.flatMap() * GroupedTable.aggregate() * GroupedTable.flatAggregate() The design document and the discussion mail list can be find here: Google doc: [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] [DISCUSS] Enhancing the functionality and productivity of Table API [https://mail.google.com/mail/u/0/#search/sunjincheng121/QgrcJHsbcVCQqBxXcgCwnggdxDBljCvbbgQ|http://example.com] [DISCUSS] Table API Enhancement Outline: [https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB] > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.7.1 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > With the continuous efforts from the community, the Flink system has been > continuously improved, which has attracted more and more users. Flink SQL is > a canonical, widely used relational query language. However, there are still > some scenarios where Flink SQL failed to meet user needs in terms of > functionality and ease of use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to acc
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10972: Issue Type: New Feature (was: Improvement) > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.7.1 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > With the continuous efforts from the community, the Flink system has been > continuously improved, which has attracted more and more users. Flink SQL is > a canonical, widely used relational query language. However, there are still > some scenarios where Flink SQL failed to meet user needs in terms of > functionality and ease of use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to accomplish the same function., with a > map() function returning 100 columns, one has to define or call 100 UDFs when > using SQL, which is quite involved. > * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be > implemented with “table.join(udtf).select()”. However, it is obvious that > datastream is easier to use than SQL. > Due to the above two reasons, In this JIRAs group, we will enhance the > TableAPI in stages. > --- > The first state we seek to support (will describe the details in the sub > issue) : > * Table.map() > * Table.flatMap() > * GroupedTable.aggregate() > * GroupedTable.flatAggregate() > The design document and the discussion mail list can be find here: > Google doc: > [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] > [DISCUSS] Enhancing the functionality and productivity of Table API > [https://mail.google.com/mail/u/0/#search/sunjincheng121/QgrcJHsbcVCQqBxXcgCwnggdxDBljCvbbgQ|http://example.com] > [DISCUSS] Table API Enhancement Outline: > [https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695482#comment-16695482 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars opened a new pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7157 ## What is the purpose of the change *(This PR is designed to solve the problem of `TextInputFormat` using UTF-16 encoding to parse files.)* ## Brief change log *(To solve this bug, I added a file BOM header encoding check to determine the current file encoding, so that when user-defined encoding format and file with a BOM header encoding format conflict processing, specific changes in the following::)* - I add `getBomFileCharset` function to `DelimitedInputFormat.java` to detect the current file BOM header coding judgment, mainly `UTF-16BE`, `UTF-16LE`, `UTF-8 with BOM header`, `UTF-32BE`, `UTF-32LE` these types, default to `UTF-8`. - I added the `bomBytes`,`fileBomCharsetMap`,`bomIdentifiedCharset` ,`configuredCharset` variable to the `DelimitedInputFormat.java`, `getBomFileCharset(split)` to the `open` method, and`setBomFileCharset` to set the `bomIdentifiedCharset`,`fileBomCharsetMap` variable.*The file name that has been parsed is used as the key, and the encoded value is inserted as a value into the `fileBomCharsetMap`. - In the `DelimitedInputFormat.java` method `getCharset()`, the encoding logic is added to obtain the encoding of the current file. I would handle the different charsets with three private fields. 1.configuredCharset: This is the charset that is configured via setCharset() 2.bomIdentifiedCharset: This is the charset that is set by setBomFileCharset() 3.charset: This is the charset that is returned by getCharset(). If not set before (i.e., null), it is set first depending on the configuredCharset and bomIdentifiedCharset. - In the `DelimitedInputFormat.java` method`GetSpecialCharset` handles both special cases of user input, utf-16 utf-32. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *I added `testFileCharset` and `testAllFileCharset` and `createUTFEncodedFile` and `testFileCharsetReadByMultiSplits` to `TextInputFormatTest`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) ## Discuss The following are the modifications and discussions that have been made by this bug. Thank you for the review by Fabian Hueske [jira link](https://issues.apache.org/jira/browse/FLINK-10134?focusedCommentId=16652877&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16652877) [pr6823](https://github.com/apache/flink/pull/6823) [pr6710](https://github.com/apache/flink/pull/6710) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterSt
[GitHub] XuQianJin-Stars opened a new pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug
XuQianJin-Stars opened a new pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7157 ## What is the purpose of the change *(This PR is designed to solve the problem of `TextInputFormat` using UTF-16 encoding to parse files.)* ## Brief change log *(To solve this bug, I added a file BOM header encoding check to determine the current file encoding, so that when user-defined encoding format and file with a BOM header encoding format conflict processing, specific changes in the following::)* - I add `getBomFileCharset` function to `DelimitedInputFormat.java` to detect the current file BOM header coding judgment, mainly `UTF-16BE`, `UTF-16LE`, `UTF-8 with BOM header`, `UTF-32BE`, `UTF-32LE` these types, default to `UTF-8`. - I added the `bomBytes`,`fileBomCharsetMap`,`bomIdentifiedCharset` ,`configuredCharset` variable to the `DelimitedInputFormat.java`, `getBomFileCharset(split)` to the `open` method, and`setBomFileCharset` to set the `bomIdentifiedCharset`,`fileBomCharsetMap` variable.*The file name that has been parsed is used as the key, and the encoded value is inserted as a value into the `fileBomCharsetMap`. - In the `DelimitedInputFormat.java` method `getCharset()`, the encoding logic is added to obtain the encoding of the current file. I would handle the different charsets with three private fields. 1.configuredCharset: This is the charset that is configured via setCharset() 2.bomIdentifiedCharset: This is the charset that is set by setBomFileCharset() 3.charset: This is the charset that is returned by getCharset(). If not set before (i.e., null), it is set first depending on the configuredCharset and bomIdentifiedCharset. - In the `DelimitedInputFormat.java` method`GetSpecialCharset` handles both special cases of user input, utf-16 utf-32. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *I added `testFileCharset` and `testAllFileCharset` and `createUTFEncodedFile` and `testFileCharsetReadByMultiSplits` to `TextInputFormatTest`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) ## Discuss The following are the modifications and discussions that have been made by this bug. Thank you for the review by Fabian Hueske [jira link](https://issues.apache.org/jira/browse/FLINK-10134?focusedCommentId=16652877&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16652877) [pr6823](https://github.com/apache/flink/pull/6823) [pr6710](https://github.com/apache/flink/pull/6710) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10970) expose metric for total state size in terms of bytes
[ https://issues.apache.org/jira/browse/FLINK-10970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10970: Assignee: vinoyang > expose metric for total state size in terms of bytes > > > Key: FLINK-10970 > URL: https://issues.apache.org/jira/browse/FLINK-10970 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Steven Zhen Wu >Assignee: vinoyang >Priority: Major > > With incremental checkpoint, checkpoint size only captures the delta size. It > will be very useful if there is another metric that captures total state > size. even an approximate number would be super useful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10969) expose API or metric for total number of keys stored in state backend
[ https://issues.apache.org/jira/browse/FLINK-10969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10969: Assignee: vinoyang > expose API or metric for total number of keys stored in state backend > - > > Key: FLINK-10969 > URL: https://issues.apache.org/jira/browse/FLINK-10969 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Steven Zhen Wu >Assignee: vinoyang >Priority: Major > > [~srichter] mentioned it might make sense to provide two versions: exact > count and approximate count. For some state backend (likes rocksDB), it may > be much cheaper to get approximate count. > exposing as metrics would be ideal. > Additionally, it will also be useful to get the total count of timers, which > are also stored in state backend. Stefan mentioned timers are just a > different namespace in state backend (e.g. column family in rocksDB). So it > will be very useful if the metrics have _namespace_ tag. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10972: Description: With the continuous efforts from the community, the Flink system has been continuously improved, which has attracted more and more users. Flink SQL is a canonical, widely used relational query language. However, there are still some scenarios where Flink SQL failed to meet user needs in terms of functionality and ease of use, such as: * In terms of functionality Iteration, user-defined window, user-defined join, user-defined GroupReduce, etc. Users cannot express them with SQL; * In terms of ease of use * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), udf2(), udf3())” can be used to accomplish the same function., with a map() function returning 100 columns, one has to define or call 100 UDFs when using SQL, which is quite involved. * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be implemented with “table.join(udtf).select()”. However, it is obvious that datastream is easier to use than SQL. Due to the above two reasons, In this JIRAs group, we will enhance the TableAPI in stages. --- The first state we seek to support (will describe the details in the sub issue) : * Table.map() * Table.flatMap() * GroupedTable.aggregate() * GroupedTable.flatAggregate() The design document and the discussion mail list can be find here: Google doc: [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] [DISCUSS] Enhancing the functionality and productivity of Table API [https://mail.google.com/mail/u/0/#search/sunjincheng121/QgrcJHsbcVCQqBxXcgCwnggdxDBljCvbbgQ|http://example.com] [DISCUSS] Table API Enhancement Outline: [https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB] was: With the continuous efforts from the community, the Flink system has been continuously improved, which has attracted more and more users. Flink SQL is a canonical, widely used relational query language. However, there are still some scenarios where Flink SQL failed to meet user needs in terms of functionality and ease of use, such as: * In terms of functionality Iteration, user-defined window, user-defined join, user-defined GroupReduce, etc. Users cannot express them with SQL; * In terms of ease of use * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), udf2(), udf3())” can be used to accomplish the same function., with a map() function returning 100 columns, one has to define or call 100 UDFs when using SQL, which is quite involved. * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be implemented with “table.join(udtf).select()”. However, it is obvious that datastream is easier to use than SQL. Due to the above two reasons, In this JIRAs group, we will enhance the TableAPI in stages. --- The first state we seek to support (will describe the details in the sub issue) : * Table.map() * Table.flatMap() * GroupedTable.aggregate() * GroupedTable.flatAggregate() The design document and the discussion mail list can be find here: Google doc: [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] [DISCUSS] Enhancing the functionality and productivity of Table API : [https://mail.google.com/mail/u/0/#search/xiaowe/QgrcJHsbcVCQqBxXcgCwnggdxDBljCvbbgQ] [DISCUSS] Table API Enhancement Outline: [https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB] > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.7.1 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > With the continuous efforts from the community, the Flink system has been > continuously improved, which has attracted more and more users. Flink SQL is > a canonical, widely used relational query language. However, there are still > some scenarios where Flink SQL failed to meet user needs in terms of > functionality and ease of use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to accomplish the same function., with a > map() function returning 100 columns
[jira] [Commented] (FLINK-10967) Update kafka dependency to 2.1.0
[ https://issues.apache.org/jira/browse/FLINK-10967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695453#comment-16695453 ] ASF GitHub Bot commented on FLINK-10967: yanghua commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.1.0 URL: https://github.com/apache/flink/pull/7156#issuecomment-440881988 @ijuma Thanks for your contribution. Upgrading the dependent version of the Kafka client requires more changes. Please refer to the [PR](https://github.com/apache/flink/pull/7101/files) I upgraded to 2.0.1 before. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update kafka dependency to 2.1.0 > > > Key: FLINK-10967 > URL: https://issues.apache.org/jira/browse/FLINK-10967 > Project: Flink > Issue Type: Bug >Reporter: Ismael Juma >Priority: Major > Labels: pull-request-available > > Apache Kafka 2.1.0 includes an important improvement with regards to producer > timeouts as described in KIP-91: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10967) Update kafka dependency to 2.1.0
[ https://issues.apache.org/jira/browse/FLINK-10967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695454#comment-16695454 ] ASF GitHub Bot commented on FLINK-10967: yanghua commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.1.0 URL: https://github.com/apache/flink/pull/7156#issuecomment-440883805 cc @tillrohrmann @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update kafka dependency to 2.1.0 > > > Key: FLINK-10967 > URL: https://issues.apache.org/jira/browse/FLINK-10967 > Project: Flink > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Labels: pull-request-available > > Apache Kafka 2.1.0 includes an important improvement with regards to producer > timeouts as described in KIP-91: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.1.0
yanghua commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.1.0 URL: https://github.com/apache/flink/pull/7156#issuecomment-440883805 cc @tillrohrmann @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10967) Update kafka dependency to 2.1.0
[ https://issues.apache.org/jira/browse/FLINK-10967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-10967: - Issue Type: Sub-task (was: Bug) Parent: FLINK-10598 > Update kafka dependency to 2.1.0 > > > Key: FLINK-10967 > URL: https://issues.apache.org/jira/browse/FLINK-10967 > Project: Flink > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > Labels: pull-request-available > > Apache Kafka 2.1.0 includes an important improvement with regards to producer > timeouts as described in KIP-91: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10972) Enhancements to Flink Table API
sunjincheng created FLINK-10972: --- Summary: Enhancements to Flink Table API Key: FLINK-10972 URL: https://issues.apache.org/jira/browse/FLINK-10972 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.7.1 Reporter: sunjincheng Assignee: sunjincheng With the continuous efforts from the community, the Flink system has been continuously improved, which has attracted more and more users. Flink SQL is a canonical, widely used relational query language. However, there are still some scenarios where Flink SQL failed to meet user needs in terms of functionality and ease of use, such as: * In terms of functionality Iteration, user-defined window, user-defined join, user-defined GroupReduce, etc. Users cannot express them with SQL; * In terms of ease of use * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), udf2(), udf3())” can be used to accomplish the same function., with a map() function returning 100 columns, one has to define or call 100 UDFs when using SQL, which is quite involved. * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be implemented with “table.join(udtf).select()”. However, it is obvious that datastream is easier to use than SQL. Due to the above two reasons, In this JIRAs group, we will enhance the TableAPI in stages. --- The first state we seek to support (will describe the details in the sub issue) : * Table.map() * Table.flatMap() * GroupedTable.aggregate() * GroupedTable.flatAggregate() The design document and the discussion mail list can be find here: Google doc: [https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit] [DISCUSS] Enhancing the functionality and productivity of Table API : [https://mail.google.com/mail/u/0/#search/xiaowe/QgrcJHsbcVCQqBxXcgCwnggdxDBljCvbbgQ] [DISCUSS] Table API Enhancement Outline: [https://mail.google.com/mail/u/0/#search/xiaowei/FMfcgxvzLWzfvCnmvMzzSfxHTSfdwLkB] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.1.0
yanghua commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.1.0 URL: https://github.com/apache/flink/pull/7156#issuecomment-440881988 @ijuma Thanks for your contribution. Upgrading the dependent version of the Kafka client requires more changes. Please refer to the [PR](https://github.com/apache/flink/pull/7101/files) I upgraded to 2.0.1 before. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10965) suggest that change $resultTypeTerm to Object at org.apache.flink.table.codegen.calls.ScalarFunctionCallGen:104
[ https://issues.apache.org/jira/browse/FLINK-10965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695445#comment-16695445 ] Hequn Cheng commented on FLINK-10965: - {{getResultType}} has a higher priority than the result type of {{eval}}. Removing the {{getResultType}} method in your udf should solve your problem. > suggest that change $resultTypeTerm to Object at > org.apache.flink.table.codegen.calls.ScalarFunctionCallGen:104 > --- > > Key: FLINK-10965 > URL: https://issues.apache.org/jira/browse/FLINK-10965 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.5.5 > Environment: 1.5.4 >Reporter: shaomeng.wang >Priority: Major > > at org.apache.flink.table.codegen.calls.ScalarFunctionCallGen:104 > > > {code:java} > val functionCallCode = > s""" > |${parameters.map(_.code).mkString("\n")} > |$resultTypeTerm $resultTerm = $functionReference.eval( > | ${parameters.map(_.resultTerm).mkString(", ")}); > |""".stripMargin{code} > > > when wrap a scalar function from another, I use the prototype of "eval" and > "getResultType" as > > {code:java} > public Object eval(Object... objs) throws Exception > public TypeInformation getResultType(Class[] signature) { > {code} > > but, in codegen, it is change to > > {code:java} > String reseult = eval(...){code} > > when getResultType return Types.STRING > and get error message as: > > > {panel:title=error msg} > Caused by: org.codehaus.commons.compiler.CompileException: Line 121, Column > 13: Assignment conversion not possible from type "java.lang.Object" to type > "double" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:10528) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3452) > at org.codehaus.janino.UnitCompiler.access$5200(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3416) > at org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3396) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4300) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3396) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2316) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1450) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1443) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2848) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443) > at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1509) > at org.codehaus.janino.UnitCompiler.access$1600(UnitCompiler.java:212) > {panel} > > > the gen code is: > > > {code:java} > @Override > public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws > Exception { > org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1; > > boolean isNull$1 = (java.lang.String) in1.getField(0) == null; > java.lang.String result$0; > if (isNull$1) { > result$0 = ""; > } > else { > result$0 = (java.lang.String) (java.lang.String) in1.getField(0); > } > > > > java.lang.String result$2 = function_com$test$Test.eval( > isNull$1 ? null : (java.lang.String) result$0); > boolean isNull$4 = result$2 == null; > java.lang.String result$3; > if (isNull$4) { > result$3 = ""; > } > else { > result$3 = (java.lang.String) result$2; > } > if (isNull$4) { > out.setField(0, null); > } > else { > out.setField(0, result$3); > } > c.collect(out); > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10964) sql-client throws exception when pagin through finished batch query
[ https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10964: Assignee: vinoyang > sql-client throws exception when pagin through finished batch query > > > Key: FLINK-10964 > URL: https://issues.apache.org/jira/browse/FLINK-10964 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: Seth Wiesman >Assignee: vinoyang >Priority: Major > > When paging through a batch query in state 'Finished' the sql client throws > the following exception: > {code:java} > org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a > result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10665) Port YARNSessionFIFOITCase#testJavaAPI to new codebase
[ https://issues.apache.org/jira/browse/FLINK-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695444#comment-16695444 ] ASF GitHub Bot commented on FLINK-10665: TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c… URL: https://github.com/apache/flink/pull/6917#issuecomment-440876707 cc @GJL @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port YARNSessionFIFOITCase#testJavaAPI to new codebase > -- > > Key: FLINK-10665 > URL: https://issues.apache.org/jira/browse/FLINK-10665 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Port {{YARNSessionFIFOITCase#testJavaAPI}} to new codebase -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c…
TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c… URL: https://github.com/apache/flink/pull/6917#issuecomment-440876707 cc @GJL @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10887) Add source watermark tracking to the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695332#comment-16695332 ] ASF GitHub Bot commented on FLINK-10887: jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r235559380 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { Review comment: Sounds good. Will update shortly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add source watermark tracking to the JobMaster > -- > > Key: FLINK-10887 > URL: https://issues.apache.org/jira/browse/FLINK-10887 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Jamie Grier >Assignee: Jamie Grier >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > We need to add a new RPC to the JobMaster such that the current watermark for > every source sub-task can be reported and the current global minimum/maximum > watermark can be retrieved so that each source can adjust their partition > read rates in an attempt to keep sources roughly aligned in event time. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r235559380 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/watermark/SourceWatermark.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.watermark; + +import java.io.Serializable; + +/** + * This represents the watermark for a single source partition. + */ +public class SourceWatermark implements Serializable { Review comment: Sounds good. Will update shortly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10971) Dependency convergence issue when building flink-s3-fs-presto
[ https://issues.apache.org/jira/browse/FLINK-10971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695304#comment-16695304 ] Ufuk Celebi commented on FLINK-10971: - *Note*: I verified that the actual RC binaries don't have this issue. So it must be something in my build environment. > Dependency convergence issue when building flink-s3-fs-presto > - > > Key: FLINK-10971 > URL: https://issues.apache.org/jira/browse/FLINK-10971 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Priority: Major > Attachments: FLINK-10971-build_output, > FLINK-10971-build_output_flink-s3-fs-presto > > > Trying to trigger a savepoint to S3 with a clean build of > {{release-1.7.0-rc2}} results in a {{java.lang.NoSuchMethodError: > org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V}}. > *Environment* > - Tag: {{release-1.7.0-rc2}} > - Build command: {{mvn clean package -DskipTests -Dcheckstyle.skip}} > - Maven version: > {code} > mvn -version > Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; > 2018-06-17T20:33:14+02:00) > Maven home: /usr/local/Cellar/maven/3.5.4/libexec > Java version: 1.8.0_192, vendor: Oracle Corporation, runtime: > /Library/Java/JavaVirtualMachines/jdk1.8.0_192.jdk/Contents/Home/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "mac os x", version: "10.14.1", arch: "x86_64", family: "mac" > {code} > *Steps to reproduce* > {code} > cp opt/flink-s3-fs-presto-1.7.0.jar lib > bin/start-cluster.sh > bin/flink run examples/streaming/TopSpeedWindowing.jar > bin/flink savepoint db37f69f21cbe54e9bf6b7e259a9c09e > {code} > *Stacktrace* > {code} > The program finished with the following exception: > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > db37f69f21cbe54e9bf6b7e259a9c09e failed. > at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723) > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: > org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:292) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: > java.lang.NoSuchMethodError: > org.apache.flink.fs.s3presto.shaded.com.google.common.base
[jira] [Created] (FLINK-10971) Dependency convergence issue when building flink-s3-fs-presto
Ufuk Celebi created FLINK-10971: --- Summary: Dependency convergence issue when building flink-s3-fs-presto Key: FLINK-10971 URL: https://issues.apache.org/jira/browse/FLINK-10971 Project: Flink Issue Type: Bug Reporter: Ufuk Celebi Trying to trigger a savepoint to S3 with a clean build of {{release-1.7.0-rc2}} results in a {{java.lang.NoSuchMethodError: org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V}}. *Environment* - Tag: {{release-1.7.0-rc2}} - Build command: {{mvn clean package -DskipTests -Dcheckstyle.skip}} - Maven version: {code} mvn -version Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-17T20:33:14+02:00) Maven home: /usr/local/Cellar/maven/3.5.4/libexec Java version: 1.8.0_192, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_192.jdk/Contents/Home/jre Default locale: en_US, platform encoding: UTF-8 OS name: "mac os x", version: "10.14.1", arch: "x86_64", family: "mac" {code} *Steps to reproduce* {code} cp opt/flink-s3-fs-presto-1.7.0.jar lib bin/start-cluster.sh bin/flink run examples/streaming/TopSpeedWindowing.jar bin/flink savepoint db37f69f21cbe54e9bf6b7e259a9c09e {code} *Stacktrace* {code} The program finished with the following exception: org.apache.flink.util.FlinkException: Triggering a savepoint for the job db37f69f21cbe54e9bf6b7e259a9c09e failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:292) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ... 12 more Caused by: java.lang.NoSuchMethodError: org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgume
[jira] [Updated] (FLINK-10971) Dependency convergence issue when building flink-s3-fs-presto
[ https://issues.apache.org/jira/browse/FLINK-10971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-10971: Attachment: FLINK-10971-build_output_flink-s3-fs-presto FLINK-10971-build_output > Dependency convergence issue when building flink-s3-fs-presto > - > > Key: FLINK-10971 > URL: https://issues.apache.org/jira/browse/FLINK-10971 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Priority: Major > Attachments: FLINK-10971-build_output, > FLINK-10971-build_output_flink-s3-fs-presto > > > Trying to trigger a savepoint to S3 with a clean build of > {{release-1.7.0-rc2}} results in a {{java.lang.NoSuchMethodError: > org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V}}. > *Environment* > - Tag: {{release-1.7.0-rc2}} > - Build command: {{mvn clean package -DskipTests -Dcheckstyle.skip}} > - Maven version: > {code} > mvn -version > Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; > 2018-06-17T20:33:14+02:00) > Maven home: /usr/local/Cellar/maven/3.5.4/libexec > Java version: 1.8.0_192, vendor: Oracle Corporation, runtime: > /Library/Java/JavaVirtualMachines/jdk1.8.0_192.jdk/Contents/Home/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "mac os x", version: "10.14.1", arch: "x86_64", family: "mac" > {code} > *Steps to reproduce* > {code} > cp opt/flink-s3-fs-presto-1.7.0.jar lib > bin/start-cluster.sh > bin/flink run examples/streaming/TopSpeedWindowing.jar > bin/flink savepoint db37f69f21cbe54e9bf6b7e259a9c09e > {code} > *Stacktrace* > {code} > The program finished with the following exception: > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > db37f69f21cbe54e9bf6b7e259a9c09e failed. > at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:723) > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:701) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:698) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1065) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: > org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$14(JobMaster.java:970) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:292) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: > java.lang.NoSuchMethodError: > org.apache.flink.fs.s3presto.shaded.com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V >
[jira] [Created] (FLINK-10970) expose metric for total state size in terms of bytes
Steven Zhen Wu created FLINK-10970: -- Summary: expose metric for total state size in terms of bytes Key: FLINK-10970 URL: https://issues.apache.org/jira/browse/FLINK-10970 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Steven Zhen Wu With incremental checkpoint, checkpoint size only captures the delta size. It will be very useful if there is another metric that captures total state size. even an approximate number would be super useful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10969) expose API or metric for total number of keys stored in state backend
[ https://issues.apache.org/jira/browse/FLINK-10969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-10969: --- Description: [~srichter] mentioned it might make sense to provide two versions: exact count and approximate count. For some state backend (likes rocksDB), it may be much cheaper to get approximate count. exposing as metrics would be ideal. Additionally, it will also be useful to get the total count of timers, which are also stored in state backend. Stefan mentioned timers are just a different namespace in state backend (e.g. column family in rocksDB). So it will be very useful if the metrics have _namespace_ tag. was:[~srichter] mentioned it might make sense to provide two versions: exact count and approximate count. For some state backend (likes rocksDB), it may be much cheaper to get approximate count. > expose API or metric for total number of keys stored in state backend > - > > Key: FLINK-10969 > URL: https://issues.apache.org/jira/browse/FLINK-10969 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Steven Zhen Wu >Priority: Major > > [~srichter] mentioned it might make sense to provide two versions: exact > count and approximate count. For some state backend (likes rocksDB), it may > be much cheaper to get approximate count. > exposing as metrics would be ideal. > Additionally, it will also be useful to get the total count of timers, which > are also stored in state backend. Stefan mentioned timers are just a > different namespace in state backend (e.g. column family in rocksDB). So it > will be very useful if the metrics have _namespace_ tag. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10969) expose API or metric for total number of keys stored in state backend
Steven Zhen Wu created FLINK-10969: -- Summary: expose API or metric for total number of keys stored in state backend Key: FLINK-10969 URL: https://issues.apache.org/jira/browse/FLINK-10969 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Steven Zhen Wu [~srichter] mentioned it might make sense to provide two versions: exact count and approximate count. For some state backend (likes rocksDB), it may be much cheaper to get approximate count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10946) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695166#comment-16695166 ] ASF GitHub Bot commented on FLINK-10946: azagrebin commented on issue #7154: [FLINK-10946] Silent checkpoint async failures in task executor if job is not runnning URL: https://github.com/apache/flink/pull/7154#issuecomment-440785486 passed in my CI: https://travis-ci.org/azagrebin/flink/builds/458023830 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test failed on Travis > > > Key: FLINK-10946 > URL: https://issues.apache.org/jira/browse/FLINK-10946 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > The test failed 3 times in total during the overall night build, but > succeeded 2 times after restart. It did not fail locally for me. > Here is a travis build to run it 500 times (reproducable): > [https://travis-ci.org/azagrebin/flink/builds/457375100] > {code:java} > 2018-11-20 11:59:54,673 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code > ArtificalKeyedStateMapper_Avro -> ArtificalOperatorStateMapper (2/2) > (e06b7022f2f2154f2a84206f068ff1fd). > 2018-11-20 11:59:54,701 INFO > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Could not > complete snapshot 12 for operator ArtificalKeyedStateMapper_Avro -> > ArtificalOperatorStateMapper (1/2). > java.io.IOException: Cannot register Closeable, registry is already closed. > Closing argument. > at > org.apache.flink.util.AbstractCloseableRegistry.registerCloseable(AbstractCloseableRegistry.java:85) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.(AsyncSnapshotCallable.java:123) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.(AsyncSnapshotCallable.java:111) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.toAsyncSnapshotFutureTask(AsyncSnapshotCallable.java:105) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy.doSnapshot(RocksIncrementalSnapshotStrategy.java:164) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.snapshot(RocksDBSnapshotStrategyBase.java:128) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:496) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > 2018-11-20 11:59:54,702 INFO org.apache.flink.runtime.taskmanager.Task > - Ensuring all FileSystem streams are closed for task Source: > Custom Source -> Timestamps/Watermarks (2/2) > (09528d6ab0e1ee87ed21e78139682b18) [CANCELED] > 2018-11-20 11:59:54,703 INFO org.apache.flink.runtime.taskm
[GitHub] azagrebin commented on issue #7154: [FLINK-10946] Silent checkpoint async failures in task executor if job is not runnning
azagrebin commented on issue #7154: [FLINK-10946] Silent checkpoint async failures in task executor if job is not runnning URL: https://github.com/apache/flink/pull/7154#issuecomment-440785486 passed in my CI: https://travis-ci.org/azagrebin/flink/builds/458023830 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10959) sometimes most taskmanger in the same node
[ https://issues.apache.org/jira/browse/FLINK-10959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695163#comment-16695163 ] Ken Krugler commented on FLINK-10959: - Hi [~luyee] - this is something that would be best to first bring up on the [Flink user mailing list|https://flink.apache.org/gettinghelp.html#user-mailing-list]. Then, if after discussion it looks like a bug, you can open a Jira issue. If that makes sense, please close this issue and post to the mailing list, thanks! > sometimes most taskmanger in the same node > --- > > Key: FLINK-10959 > URL: https://issues.apache.org/jira/browse/FLINK-10959 > Project: Flink > Issue Type: Bug >Reporter: Yee >Priority: Major > > flink on yarn > first: > 8* taskmanger ( 7 in node A, 1in node B) > after restart flink > 8*taskmanger (4 in node C, 3 in node D, 1 in node E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10968) Implement TaskManager Entrypoint
JIN SUN created FLINK-10968: --- Summary: Implement TaskManager Entrypoint Key: FLINK-10968 URL: https://issues.apache.org/jira/browse/FLINK-10968 Project: Flink Issue Type: Sub-task Reporter: JIN SUN Assignee: JIN SUN implement the main() entrypoint to start task manager pod. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695089#comment-16695089 ] ASF GitHub Bot commented on FLINK-9083: --- azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r234920560 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: Actually, as calls to `invoke()` and `snapshotState()` are serialised by checkpoint lock. It seems good enough to have only semaphore. When `flush()` is called, no new requests are created, so `flush()` could just wait for pending requests to finish by acquiring all the permits and then releasing them for nex
[GitHub] azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
azagrebin commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r234920560 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: Actually, as calls to `invoke()` and `snapshotState()` are serialised by checkpoint lock. It seems good enough to have only semaphore. When `flush()` is called, no new requests are created, so `flush()` could just wait for pending requests to finish by acquiring all the permits and then releasing them for next requests. `close()` might be called asynchronously when job is canceled abruptly. We could have a volatile variable to signal it in the beginning of `close()` and not start new requests in `invode()`. Eventually there will be no pending ones.
[jira] [Reopened] (FLINK-5005) Remove Scala 2.10 support; add Scala 2.12 support
[ https://issues.apache.org/jira/browse/FLINK-5005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-5005: -- > Remove Scala 2.10 support; add Scala 2.12 support > - > > Key: FLINK-5005 > URL: https://issues.apache.org/jira/browse/FLINK-5005 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: Andrew Roberts >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.7.0 > > > Scala 2.12 was [released|http://www.scala-lang.org/news/2.12.0] today, and > offers many compile-time and runtime speed improvements. It would be great to > get artifacts up on maven central to allow Flink users to migrate to Scala > 2.12.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10355) Counting of table columns should start with 1 instead of 0
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10355: -- Summary: Counting of table columns should start with 1 instead of 0 (was: The order of the column should start from 1.) > Counting of table columns should start with 1 instead of 0 > -- > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: lihongli >Assignee: lihongli >Priority: Major > Labels: easyfix, pull-request-available > Fix For: 1.7.0 > > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-9126: -- > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector, DataSet API >Affects Versions: 1.4.2, 1.5.0 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10172) Inconsistency in ExpressionParser and ExpressionDsl for order by asc/desc
[ https://issues.apache.org/jira/browse/FLINK-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10172: -- Summary: Inconsistency in ExpressionParser and ExpressionDsl for order by asc/desc (was: Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc) > Inconsistency in ExpressionParser and ExpressionDsl for order by asc/desc > - > > Key: FLINK-10172 > URL: https://issues.apache.org/jira/browse/FLINK-10172 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.3, 1.4.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.3.3, 1.4.3, 1.5.4, 1.6.1, 1.7.0 > > > The following expression throws an exception in parsing {{"id.asc"}} term. > {code:java} > Table allOrders = orderTable > .select("id,order_date,amount,customer_id") > .orderBy("id.asc"); > {code} > while it is correctly parsed for Scala: > {code:scala} > val allOrders:Table = orderTable > .select('id, 'order_date, 'amount, 'customer_id) > .orderBy('id.asc) > {code} > Anticipated some inconsistency between ExpressionParser and ExpressionDsl -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10355) Counting of table columns should start with 1 instead of 0
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10355. - Resolution: Fixed > Counting of table columns should start with 1 instead of 0 > -- > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: lihongli >Assignee: lihongli >Priority: Major > Labels: easyfix, pull-request-available > Fix For: 1.7.0 > > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10355) The order of the column should start from 1.
[ https://issues.apache.org/jira/browse/FLINK-10355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10355: --- > The order of the column should start from 1. > > > Key: FLINK-10355 > URL: https://issues.apache.org/jira/browse/FLINK-10355 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: lihongli >Assignee: lihongli >Priority: Major > Labels: easyfix, pull-request-available > Fix For: 1.7.0 > > Attachments: B0C32FD9-47FE-4F63-921F-A9E49C0CB5CD.png > > > When I register an external Table using a CsvTableSource.It throws an > exception :"Parsing error for column 1".But I finally found that the second > column is the error column.I think that the order of the column should start > from 1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10056) Add test for JobMaster#requestNextInputSplit
[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10056: -- Summary: Add test for JobMaster#requestNextInputSplit (was: Add testRequestNextInputSplit) > Add test for JobMaster#requestNextInputSplit > > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10073) Allow setting a restart strategy in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10073: --- > Allow setting a restart strategy in SQL Client > -- > > Key: FLINK-10073 > URL: https://issues.apache.org/jira/browse/FLINK-10073 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, it is not possible to set a restart strategy per job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10967) Update kafka dependency to 2.1.0
[ https://issues.apache.org/jira/browse/FLINK-10967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10967: --- Labels: pull-request-available (was: ) > Update kafka dependency to 2.1.0 > > > Key: FLINK-10967 > URL: https://issues.apache.org/jira/browse/FLINK-10967 > Project: Flink > Issue Type: Bug >Reporter: Ismael Juma >Priority: Major > Labels: pull-request-available > > Apache Kafka 2.1.0 includes an important improvement with regards to producer > timeouts as described in KIP-91: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9126. Resolution: Fixed > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector, DataSet API >Affects Versions: 1.4.2, 1.5.0 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10967) Update kafka dependency to 2.1.0
[ https://issues.apache.org/jira/browse/FLINK-10967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694951#comment-16694951 ] ASF GitHub Bot commented on FLINK-10967: ijuma opened a new pull request #7156: [FLINK-10967] Update kafka dependency to 2.1.0 URL: https://github.com/apache/flink/pull/7156 ## What is the purpose of the change Apache Kafka 2.1.0 includes an important improvement with regards to producer timeouts as described in KIP-91: https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer ## Brief change log The kafka connector kafka dependency has been updated to 2.1.0. ## Verifying this change This change is already covered by the existing kafka connector tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update kafka dependency to 2.1.0 > > > Key: FLINK-10967 > URL: https://issues.apache.org/jira/browse/FLINK-10967 > Project: Flink > Issue Type: Bug >Reporter: Ismael Juma >Priority: Major > Labels: pull-request-available > > Apache Kafka 2.1.0 includes an important improvement with regards to producer > timeouts as described in KIP-91: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10309) Cancel with savepoint fails with java.net.ConnectException when using the per job-mode
[ https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10309: -- Summary: Cancel with savepoint fails with java.net.ConnectException when using the per job-mode (was: Cancel flink job occurs java.net.ConnectException) > Cancel with savepoint fails with java.net.ConnectException when using the per > job-mode > -- > > Key: FLINK-10309 > URL: https://issues.apache.org/jira/browse/FLINK-10309 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: vinoyang >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The problem occurs when using the Yarn per-job detached mode. Trying to > cancel with savepoint fails with the following exception before being able to > retrieve the savepoint path: > exception stack trace : > {code:java} > org.apache.flink.util.FlinkException: Could not cancel job . > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) > at > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) > ... 6 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > ... 1 more > Caused by: java.util.concurrent.CompletionException: > java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.complet
[GitHub] ijuma opened a new pull request #7156: [FLINK-10967] Update kafka dependency to 2.1.0
ijuma opened a new pull request #7156: [FLINK-10967] Update kafka dependency to 2.1.0 URL: https://github.com/apache/flink/pull/7156 ## What is the purpose of the change Apache Kafka 2.1.0 includes an important improvement with regards to producer timeouts as described in KIP-91: https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer ## Brief change log The kafka connector kafka dependency has been updated to 2.1.0. ## Verifying this change This change is already covered by the existing kafka connector tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10309) Cancel with savepoint fails with java.net.ConnectException when using the per job-mode
[ https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10309. - Resolution: Fixed > Cancel with savepoint fails with java.net.ConnectException when using the per > job-mode > -- > > Key: FLINK-10309 > URL: https://issues.apache.org/jira/browse/FLINK-10309 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: vinoyang >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The problem occurs when using the Yarn per-job detached mode. Trying to > cancel with savepoint fails with the following exception before being able to > retrieve the savepoint path: > exception stack trace : > {code:java} > org.apache.flink.util.FlinkException: Could not cancel job . > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) > at > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) > ... 6 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > ... 1 more > Caused by: java.util.concurrent.CompletionException: > java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) >
[jira] [Created] (FLINK-10967) Update kafka dependency to 2.1.0
Ismael Juma created FLINK-10967: --- Summary: Update kafka dependency to 2.1.0 Key: FLINK-10967 URL: https://issues.apache.org/jira/browse/FLINK-10967 Project: Flink Issue Type: Bug Reporter: Ismael Juma Apache Kafka 2.1.0 includes an important improvement with regards to producer timeouts as described in KIP-91: https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10309) Cancel flink job occurs java.net.ConnectException
[ https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10309: --- > Cancel flink job occurs java.net.ConnectException > - > > Key: FLINK-10309 > URL: https://issues.apache.org/jira/browse/FLINK-10309 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: vinoyang >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The problem occurs when using the Yarn per-job detached mode. Trying to > cancel with savepoint fails with the following exception before being able to > retrieve the savepoint path: > exception stack trace : > {code:java} > org.apache.flink.util.FlinkException: Could not cancel job . > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) > at > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) > ... 6 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > ... 1 more > Caused by: java.util.concurrent.CompletionException: > java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
[jira] [Closed] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration
[ https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10371. - Resolution: Fixed > Allow to enable SSL mutual authentication on REST endpoints by configuration > > > Key: FLINK-10371 > URL: https://issues.apache.org/jira/browse/FLINK-10371 > Project: Flink > Issue Type: New Feature > Components: Client, REST, Security >Affects Versions: 1.6.0, 1.7.0 >Reporter: Johannes Dillmann >Assignee: Johannes Dillmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > With Flink 1.6 SSL mutual authentication was introduced for internal > connectivity in FLINK-9312. > SSL support for external connectivity was also introduced in regard to > encryption of the connection and verification of the Flink REST endpoint from > the client side. > But _mutual authentication between the REST endpoint and clients is not > supported yet_. > The [documentation suggests > |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html] > using a side car proxy to enable SSL mutual auth on the REST endpoint and > points out the advantages of using a feature rich proxy. > While this is a good rationale, there are still important use cases for > support of simple mutual authentication directly in Flink: Mainly support > for using standard images in a containerized environment. > There are tools used to setup Flink Jobs (for example on Kubernetes clusters) > and act as gateways to the Flink REST endpoint and the Flink web interface. > To prevent unauthorised access to Flink the connectivity has to be secured. > As the tools acts as gateway it is easy to create and pass a shared keystore > and truststore used for mutual authentication to the Flink instances > configurations. > To enable for SSL mutual authentication on REST endpoints, I am suggesting to > add a the configuration parameter `security.ssl.rest.authentication-enabled` > which defaults to `false`. > If it is set to `true` the `SSLUtils` factories for creating the REST server > endpoint and the REST clients should set authentication to required and share > `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL > mutual authenticated connections. > > I have a working prototype which I would gladly submit as a PR to get further > feedback. The changes to Flink are minimal and the default behaviour won't > change. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration
[ https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10371: --- > Allow to enable SSL mutual authentication on REST endpoints by configuration > > > Key: FLINK-10371 > URL: https://issues.apache.org/jira/browse/FLINK-10371 > Project: Flink > Issue Type: Improvement > Components: Client, REST, Security >Affects Versions: 1.6.0, 1.7.0 >Reporter: Johannes Dillmann >Assignee: Johannes Dillmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > With Flink 1.6 SSL mutual authentication was introduced for internal > connectivity in FLINK-9312. > SSL support for external connectivity was also introduced in regard to > encryption of the connection and verification of the Flink REST endpoint from > the client side. > But _mutual authentication between the REST endpoint and clients is not > supported yet_. > The [documentation suggests > |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html] > using a side car proxy to enable SSL mutual auth on the REST endpoint and > points out the advantages of using a feature rich proxy. > While this is a good rationale, there are still important use cases for > support of simple mutual authentication directly in Flink: Mainly support > for using standard images in a containerized environment. > There are tools used to setup Flink Jobs (for example on Kubernetes clusters) > and act as gateways to the Flink REST endpoint and the Flink web interface. > To prevent unauthorised access to Flink the connectivity has to be secured. > As the tools acts as gateway it is easy to create and pass a shared keystore > and truststore used for mutual authentication to the Flink instances > configurations. > To enable for SSL mutual authentication on REST endpoints, I am suggesting to > add a the configuration parameter `security.ssl.rest.authentication-enabled` > which defaults to `false`. > If it is set to `true` the `SSLUtils` factories for creating the REST server > endpoint and the REST clients should set authentication to required and share > `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL > mutual authenticated connections. > > I have a working prototype which I would gladly submit as a PR to get further > feedback. The changes to Flink are minimal and the default behaviour won't > change. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration
[ https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10371: -- Issue Type: New Feature (was: Improvement) > Allow to enable SSL mutual authentication on REST endpoints by configuration > > > Key: FLINK-10371 > URL: https://issues.apache.org/jira/browse/FLINK-10371 > Project: Flink > Issue Type: New Feature > Components: Client, REST, Security >Affects Versions: 1.6.0, 1.7.0 >Reporter: Johannes Dillmann >Assignee: Johannes Dillmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > With Flink 1.6 SSL mutual authentication was introduced for internal > connectivity in FLINK-9312. > SSL support for external connectivity was also introduced in regard to > encryption of the connection and verification of the Flink REST endpoint from > the client side. > But _mutual authentication between the REST endpoint and clients is not > supported yet_. > The [documentation suggests > |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html] > using a side car proxy to enable SSL mutual auth on the REST endpoint and > points out the advantages of using a feature rich proxy. > While this is a good rationale, there are still important use cases for > support of simple mutual authentication directly in Flink: Mainly support > for using standard images in a containerized environment. > There are tools used to setup Flink Jobs (for example on Kubernetes clusters) > and act as gateways to the Flink REST endpoint and the Flink web interface. > To prevent unauthorised access to Flink the connectivity has to be secured. > As the tools acts as gateway it is easy to create and pass a shared keystore > and truststore used for mutual authentication to the Flink instances > configurations. > To enable for SSL mutual authentication on REST endpoints, I am suggesting to > add a the configuration parameter `security.ssl.rest.authentication-enabled` > which defaults to `false`. > If it is set to `true` the `SSLUtils` factories for creating the REST server > endpoint and the REST clients should set authentication to required and share > `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL > mutual authenticated connections. > > I have a working prototype which I would gladly submit as a PR to get further > feedback. The changes to Flink are minimal and the default behaviour won't > change. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10172) Inconsistency in ExpressionParser and ExpressionDsl for order by asc/desc
[ https://issues.apache.org/jira/browse/FLINK-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10172. - Resolution: Fixed > Inconsistency in ExpressionParser and ExpressionDsl for order by asc/desc > - > > Key: FLINK-10172 > URL: https://issues.apache.org/jira/browse/FLINK-10172 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.3, 1.4.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.3.3, 1.4.3, 1.5.4, 1.6.1 > > > The following expression throws an exception in parsing {{"id.asc"}} term. > {code:java} > Table allOrders = orderTable > .select("id,order_date,amount,customer_id") > .orderBy("id.asc"); > {code} > while it is correctly parsed for Scala: > {code:scala} > val allOrders:Table = orderTable > .select('id, 'order_date, 'amount, 'customer_id) > .orderBy('id.asc) > {code} > Anticipated some inconsistency between ExpressionParser and ExpressionDsl -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10172) Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc
[ https://issues.apache.org/jira/browse/FLINK-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10172: --- > Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc > -- > > Key: FLINK-10172 > URL: https://issues.apache.org/jira/browse/FLINK-10172 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.3, 1.4.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.3.3, 1.4.3, 1.5.4, 1.6.1, 1.7.0 > > > The following expression throws an exception in parsing {{"id.asc"}} term. > {code:java} > Table allOrders = orderTable > .select("id,order_date,amount,customer_id") > .orderBy("id.asc"); > {code} > while it is correctly parsed for Scala: > {code:scala} > val allOrders:Table = orderTable > .select('id, 'order_date, 'amount, 'customer_id) > .orderBy('id.asc) > {code} > Anticipated some inconsistency between ExpressionParser and ExpressionDsl -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10073) Allow setting a restart strategy in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10073: -- Issue Type: New Feature (was: Improvement) > Allow setting a restart strategy in SQL Client > -- > > Key: FLINK-10073 > URL: https://issues.apache.org/jira/browse/FLINK-10073 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, it is not possible to set a restart strategy per job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10310) Cassandra Sink - Handling failing requests
[ https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10310: --- > Cassandra Sink - Handling failing requests > -- > > Key: FLINK-10310 > URL: https://issues.apache.org/jira/browse/FLINK-10310 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jayant Ameta >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The cassandra sink fails for any kind of error. For some exceptions (e.g > WriteTimeoutException), ignoring the exception may be acceptable as well. > Can we discuss having a FailureHandler on the lines of > ActionRequestFailureHandler? > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10073) Allow setting a restart strategy in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10073. - Resolution: Fixed > Allow setting a restart strategy in SQL Client > -- > > Key: FLINK-10073 > URL: https://issues.apache.org/jira/browse/FLINK-10073 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, it is not possible to set a restart strategy per job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10310) Cassandra Sink - Handling failing requests
[ https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10310. --- Resolution: Implemented > Cassandra Sink - Handling failing requests > -- > > Key: FLINK-10310 > URL: https://issues.apache.org/jira/browse/FLINK-10310 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Reporter: Jayant Ameta >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The cassandra sink fails for any kind of error. For some exceptions (e.g > WriteTimeoutException), ignoring the exception may be acceptable as well. > Can we discuss having a FailureHandler on the lines of > ActionRequestFailureHandler? > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10310) Cassandra Sink - Handling failing requests
[ https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10310: -- Issue Type: New Feature (was: Improvement) > Cassandra Sink - Handling failing requests > -- > > Key: FLINK-10310 > URL: https://issues.apache.org/jira/browse/FLINK-10310 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Reporter: Jayant Ameta >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The cassandra sink fails for any kind of error. For some exceptions (e.g > WriteTimeoutException), ignoring the exception may be acceptable as well. > Can we discuss having a FailureHandler on the lines of > ActionRequestFailureHandler? > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10050. - Resolution: Implemented > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10050: --- > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9697. Resolution: Implemented > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: New Feature >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10050: -- Issue Type: New Feature (was: Improvement) > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: pull-request-available, ready-to-commit, windows > Fix For: 1.7.0 > > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)