[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656344#comment-16656344 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-431261877 Hi @zentol I have not add test for the new implementation, but can you review it firstly so that it can match your requirement? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-431261877 Hi @zentol I have not add test for the new implementation, but can you review it firstly so that it can match your requirement? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656323#comment-16656323 ] ASF GitHub Bot commented on FLINK-10436: zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431256748 > If we now introduce a RestOption#DISPATCHER_ADDRESS with the fallback to RestOption#ADDRESS and JobManagerOptions#ADDRESS and remove the latter as a deprecated key from the former, then we break setups. > In the future, this might change and could require to remove the fallback dependency depending on what the default behaviour is. This also applies to your proposed solution for handling this issue in the future. Although ... is this really is an issue? We can change all usages within Flink to `DISPATCHER_ADDRESS` (which should be reasonable considering that we only ever go to the dispatcher via REST), with the exception of the `RestServerEndpoint` for which we map ' DISPATCHER_ADDRESS' to ' REST#ADDRESS'. What could this break? I can only come up with a custom `ClusterEntryPoint` that uses a `RestServerEndpoint` internally and no other Flink component that is aware of the configuration thingie, which seems unlikely? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431256748 > If we now introduce a RestOption#DISPATCHER_ADDRESS with the fallback to RestOption#ADDRESS and JobManagerOptions#ADDRESS and remove the latter as a deprecated key from the former, then we break setups. > In the future, this might change and could require to remove the fallback dependency depending on what the default behaviour is. This also applies to your proposed solution for handling this issue in the future. Although ... is this really is an issue? We can change all usages within Flink to `DISPATCHER_ADDRESS` (which should be reasonable considering that we only ever go to the dispatcher via REST), with the exception of the `RestServerEndpoint` for which we map ' DISPATCHER_ADDRESS' to ' REST#ADDRESS'. What could this break? I can only come up with a custom `ClusterEntryPoint` that uses a `RestServerEndpoint` internally and no other Flink component that is aware of the configuration thingie, which seems unlikely? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10603) Reduce kafka test duration
[ https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10603: Assignee: vinoyang > Reduce kafka test duration > -- > > Key: FLINK-10603 > URL: https://issues.apache.org/jira/browse/FLINK-10603 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > > The tests for the modern kafka connector take more than 10 minutes which is > simply unacceptable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10552) Provide RichAsyncFunction for scala API
[ https://issues.apache.org/jira/browse/FLINK-10552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656315#comment-16656315 ] ASF GitHub Bot commented on FLINK-10552: Clark opened a new pull request #6878: [FLINK-10552][DataStream API]Add supports for RichAsyncFunction in Scala API URL: https://github.com/apache/flink/pull/6878 ## What is the purpose of the change Support RichAsyncFunction in Scala API ## Brief change log - Change Async related RuntimeContext to public - Add RichAsyncFunction of Scala version ## Verifying this change The runtime context has been tested in RichAsyncFunctionTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction for scala API > --- > > Key: FLINK-10552 > URL: https://issues.apache.org/jira/browse/FLINK-10552 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Currently, only Java API provide a RichAsyncFunction abstract class while > scala dose not. Thought it would be nice to provide the same function for > scala api. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Clarkkkkk opened a new pull request #6878: [FLINK-10552][DataStream API]Add supports for RichAsyncFunction in Scala API
Clark opened a new pull request #6878: [FLINK-10552][DataStream API]Add supports for RichAsyncFunction in Scala API URL: https://github.com/apache/flink/pull/6878 ## What is the purpose of the change Support RichAsyncFunction in Scala API ## Brief change log - Change Async related RuntimeContext to public - Add RichAsyncFunction of Scala version ## Verifying this change The runtime context has been tested in RichAsyncFunctionTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10531) State TTL RocksDb backend end-to-end test failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu reassigned FLINK-10531: -- Assignee: Renjie Liu > State TTL RocksDb backend end-to-end test failed on Travis > -- > > Key: FLINK-10531 > URL: https://issues.apache.org/jira/browse/FLINK-10531 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.1 >Reporter: Till Rohrmann >Assignee: Renjie Liu >Priority: Critical > Labels: test-stability > > The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on > Travis. > https://travis-ci.org/apache/flink/jobs/438226190 > https://api.travis-ci.org/v3/job/438226190/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656242#comment-16656242 ] ASF GitHub Bot commented on FLINK-10491: zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#issuecomment-431236371 @pnowojski , thanks for reviews again! I have mainly two concerns of tests from comments. 1. How should we define the scope of this PR? I found another common issue that the new added tests may trigger some unreasonable history tests. If we solved all the related history tests in this PR, it may be not good for convergence for this PR. 2. How to evaluate the usages of mock, spy, etc in test? I have some thoughts in the comments. BTW, I learned some good ideas from your test reviews, then I think I would focus on polishing some existing history tests if have time. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deadlock during spilling data in SpillableSubpartition > --- > > Key: FLINK-10491 > URL: https://issues.apache.org/jira/browse/FLINK-10491 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.4, 1.6.1 >Reporter: Piotr Nowojski >Assignee: zhijiang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Originally reported here: > [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E] > Thread dump (from 1.5.3 version) showing two deadlocked threads, because they > are taking two locks in different order: > {noformat} > Thread-1 > "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA > waiting for monitor entry > waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355) > - locked <0x2dfd> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203) > - locked <0x2da5> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > Thread-2 > "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor > entry > java.lang.Thread.State: BLOCKED > blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 > waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release > lock on <0x2dfd> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.
[GitHub] zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool
zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#issuecomment-431236371 @pnowojski , thanks for reviews again! I have mainly two concerns of tests from comments. 1. How should we define the scope of this PR? I found another common issue that the new added tests may trigger some unreasonable history tests. If we solved all the related history tests in this PR, it may be not good for convergence for this PR. 2. How to evaluate the usages of mock, spy, etc in test? I have some thoughts in the comments. BTW, I learned some good ideas from your test reviews, then I think I would focus on polishing some existing history tests if have time. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656239#comment-16656239 ] TisonKun commented on FLINK-10540: -- Yes I see. And in testing code its subclasses({{LocalFlinkMiniCluster}} and {{TestingCluster}}) are still in use, we might want to get rid of those uses, too. Otherwise they become blockers to this. > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656237#comment-16656237 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#discussion_r226526901 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException { /** * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). */ + @Ignore @Test(timeout = 10) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); Review comment: @tillrohrmann So, is your review suggestion still the same? Do we still need to keep the isNewMode field? (About these methods, I have created a [separate issue](https://issues.apache.org/jira/browse/FLINK-10558) to track them, I think we will not forget them in the end). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#discussion_r226526901 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException { /** * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). */ + @Ignore @Test(timeout = 10) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); Review comment: @tillrohrmann So, is your review suggestion still the same? Do we still need to keep the isNewMode field? (About these methods, I have created a [separate issue](https://issues.apache.org/jira/browse/FLINK-10558) to track them, I think we will not forget them in the end). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656235#comment-16656235 ] Shimin Yang edited comment on FLINK-10540 at 10/19/18 3:37 AM: --- Look like it's only used by flink-storm example in production code. Better to start after storm is removed [~Tison]. was (Author: dangdangdang): Look like it's only used by flink-storm example. Better to start after storm is removed [~Tison]. > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656236#comment-16656236 ] Paul Lin commented on FLINK-8098: - +1 for this issue. We get the same problem on Flink 1.5.3 with CDH-5.6.0. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal >Priority: Major > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659) >
[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656235#comment-16656235 ] Shimin Yang commented on FLINK-10540: - Look like it's only used by flink-storm example. Better to start after storm is removed [~Tison]. > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656234#comment-16656234 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226526582 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -266,6 +268,21 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns the ASCII code value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + if (CharMatcher.ASCII.matches(str.charAt(0))) { +str.charAt(0).toByte.toInt + } else { +0 Review comment: Hi @xccui I tried to change the value of this method that originally returned 0 to null, but both related tests throw NPE, I don't know if it is related to Integer. Can you try it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226526582 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -266,6 +268,21 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns the ASCII code value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + if (CharMatcher.ASCII.matches(str.charAt(0))) { +str.charAt(0).toByte.toInt + } else { +0 Review comment: Hi @xccui I tried to change the value of this method that originally returned 0 to null, but both related tests throw NPE, I don't know if it is related to Integer. Can you try it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656231#comment-16656231 ] ASF GitHub Bot commented on FLINK-10491: zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#discussion_r226525858 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -205,6 +216,66 @@ protected void testAddOnPartition(final ResultPartitionType pipelined) } } + @Test + public void testReleaseMemoryOnBlockingPartition() throws Exception { + testReleaseMemory(ResultPartitionType.BLOCKING); + } + + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { + testReleaseMemory(ResultPartitionType.PIPELINED); + } + + /** +* Tests {@link ResultPartition#releaseMemory(int)} on a working partition. +* +* @param resultPartitionType the result partition type to set up +*/ + private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + final int numBuffers = 10; + final NetworkEnvironment network = new NetworkEnvironment( + new NetworkBufferPool(numBuffers, 128), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOManager.IOMode.SYNC, + 0, + 0, + 2, + 8, + true); + final ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + final ResultPartition resultPartition = spy(createPartition(notifier, resultPartitionType, false)); Review comment: I also considered the `spy` usage here may bring some concerns and thought of the way you mentioned to avoid the `spy`. I should explain the reasons why using `spy` finally. I think there should have two separate tests for verifying different logics in two dimessions: 1. Whether the `ResultPartition` is assigned as `BufferPoolOwner` correctly during creating `BufferPool` for different types, and this test is for verifying this relationship. 2. Verify the logic of interface implementation for `BufferPoolOwner#releaseMemory` which is also missing currently, and I mentioned this in previous comments. But I think it is not the scope of this PR and I am willing to open a JIRA for it. At the beginning I also think the first verify actually does not belong to the scope of this PR, just because of the careless of migrating history codes to find history missing tests. So if we want to check the state of `NetworkBufferPool` to avoid `spy` here, we have to touch the detail logic of `BufferPoolOwner#releaseMemory` mentioned in second part. And the first part is enough for this PR to avoid the mistake of assigning `BufferPoolOwner`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deadlock during spilling data in SpillableSubpartition > --- > > Key: FLINK-10491 > URL: https://issues.apache.org/jira/browse/FLINK-10491 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.4, 1.6.1 >Reporter: Piotr Nowojski >Assignee: zhijiang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Originally reported here: > [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E] > Thread dump (from 1.5.3 version) showing two deadlocked threads, because they > are taking two locks in different order: > {noformat} > Thread-1 > "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA > waiting for monitor entry > waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223) > at > org.apache.flink.runtime.io.networ
[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#discussion_r226525858 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -205,6 +216,66 @@ protected void testAddOnPartition(final ResultPartitionType pipelined) } } + @Test + public void testReleaseMemoryOnBlockingPartition() throws Exception { + testReleaseMemory(ResultPartitionType.BLOCKING); + } + + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { + testReleaseMemory(ResultPartitionType.PIPELINED); + } + + /** +* Tests {@link ResultPartition#releaseMemory(int)} on a working partition. +* +* @param resultPartitionType the result partition type to set up +*/ + private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + final int numBuffers = 10; + final NetworkEnvironment network = new NetworkEnvironment( + new NetworkBufferPool(numBuffers, 128), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOManager.IOMode.SYNC, + 0, + 0, + 2, + 8, + true); + final ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + final ResultPartition resultPartition = spy(createPartition(notifier, resultPartitionType, false)); Review comment: I also considered the `spy` usage here may bring some concerns and thought of the way you mentioned to avoid the `spy`. I should explain the reasons why using `spy` finally. I think there should have two separate tests for verifying different logics in two dimessions: 1. Whether the `ResultPartition` is assigned as `BufferPoolOwner` correctly during creating `BufferPool` for different types, and this test is for verifying this relationship. 2. Verify the logic of interface implementation for `BufferPoolOwner#releaseMemory` which is also missing currently, and I mentioned this in previous comments. But I think it is not the scope of this PR and I am willing to open a JIRA for it. At the beginning I also think the first verify actually does not belong to the scope of this PR, just because of the careless of migrating history codes to find history missing tests. So if we want to check the state of `NetworkBufferPool` to avoid `spy` here, we have to touch the detail logic of `BufferPoolOwner#releaseMemory` mentioned in second part. And the first part is enough for this PR to avoid the mistake of assigning `BufferPoolOwner`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8483) Implement and expose outer joins
[ https://issues.apache.org/jira/browse/FLINK-8483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656229#comment-16656229 ] ASF GitHub Bot commented on FLINK-8483: --- xccui commented on issue #6874: [FLINK-8483][DataStream] Implement and expose outer joins URL: https://github.com/apache/flink/pull/6874#issuecomment-431232789 Hi @florianschmidt1994 , thanks for working on this! Before looking into the code, I'd like to share two thoughts. 1. I think there's no need to register a clean-up timer for each record. In [TimeBoundedJoin](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala), only the first timer is registered in the `processElement()` method and following timers are registered in a "chained manner" after each cleaning-up. 2. Without holding back watermarks, the join results in the downstream operators may be taken as outdated (according to the different mechanisms for rowtime propagation), which makes the join operation less applicable. What do you think? Best, Xingcan This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement and expose outer joins > > > Key: FLINK-8483 > URL: https://issues.apache.org/jira/browse/FLINK-8483 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on issue #6874: [FLINK-8483][DataStream] Implement and expose outer joins
xccui commented on issue #6874: [FLINK-8483][DataStream] Implement and expose outer joins URL: https://github.com/apache/flink/pull/6874#issuecomment-431232789 Hi @florianschmidt1994 , thanks for working on this! Before looking into the code, I'd like to share two thoughts. 1. I think there's no need to register a clean-up timer for each record. In [TimeBoundedJoin](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala), only the first timer is registered in the `processElement()` method and following timers are registered in a "chained manner" after each cleaning-up. 2. Without holding back watermarks, the join results in the downstream operators may be taken as outdated (according to the different mechanisms for rowtime propagation), which makes the join operation less applicable. What do you think? Best, Xingcan This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6873: [hotfix] Add Review Progress section to PR description template.
sunjincheng121 commented on a change in pull request #6873: [hotfix] Add Review Progress section to PR description template. URL: https://github.com/apache/flink/pull/6873#discussion_r226525062 ## File path: .github/PULL_REQUEST_TEMPLATE.md ## @@ -70,3 +70,17 @@ This change added tests and can be verified as follows: - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) + + + +# Review Progress + +**NOTE: THE REVIEW PROGRESS MUST ONLY BE UPDATED BY AN APACHE FLINK COMMITTER!** + +* [ ] 1. The contribution is well-described. +* [ ] 2. There is consensus that the contribution should go into to Flink. +* [ ] 3. [Does not need specific attention | Needs specific attention for X | Has attention for X by Y] Review comment: About [3], If the committer identifies [Needs specific attention for X], what do we need to do with PR? From the points of my view, the committer who change the review process, have to make sure all the specific attention parts are solved.So the [3] should be described : [ ] 3. All the specific attention parts are well-solved. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656218#comment-16656218 ] TisonKun commented on FLINK-10540: -- Great take [~dangdangdang]! > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656215#comment-16656215 ] ASF GitHub Bot commented on FLINK-10491: zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#discussion_r226523253 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -205,6 +216,66 @@ protected void testAddOnPartition(final ResultPartitionType pipelined) } } + @Test + public void testReleaseMemoryOnBlockingPartition() throws Exception { + testReleaseMemory(ResultPartitionType.BLOCKING); + } + + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { + testReleaseMemory(ResultPartitionType.PIPELINED); + } + + /** +* Tests {@link ResultPartition#releaseMemory(int)} on a working partition. +* +* @param resultPartitionType the result partition type to set up +*/ + private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + final int numBuffers = 10; + final NetworkEnvironment network = new NetworkEnvironment( + new NetworkBufferPool(numBuffers, 128), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOManager.IOMode.SYNC, + 0, + 0, + 2, + 8, + true); + final ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); Review comment: I checked these two implementations and they are exactly the same with empty operation, so I think we should open a separate JIRA for unifying current two existing implementations and make it public for outer usage. Considering the mock issue, I have some concerns: 1. Do you think there are any differences between mock and empty operations in real class? 2. There are already many usages of mock in existing tests, do you think it is necessary to polish them in real classes? If so, I am willing to open JIRA for them. :) 3. I think the unit tests are mainly for verifying partial logics or processes, so if the behaviors of some classes are not cared about in the target test, we can mock these objects to make simple. Otherwise we can develop the ITCase for end-to-end verify with all real classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deadlock during spilling data in SpillableSubpartition > --- > > Key: FLINK-10491 > URL: https://issues.apache.org/jira/browse/FLINK-10491 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.4, 1.6.1 >Reporter: Piotr Nowojski >Assignee: zhijiang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Originally reported here: > [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E] > Thread dump (from 1.5.3 version) showing two deadlocked threads, because they > are taking two locks in different order: > {noformat} > Thread-1 > "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA > waiting for monitor entry > waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355) > - locked <0x2dfd> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203) > - locked <0x2da5>
[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#discussion_r226523253 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -205,6 +216,66 @@ protected void testAddOnPartition(final ResultPartitionType pipelined) } } + @Test + public void testReleaseMemoryOnBlockingPartition() throws Exception { + testReleaseMemory(ResultPartitionType.BLOCKING); + } + + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { + testReleaseMemory(ResultPartitionType.PIPELINED); + } + + /** +* Tests {@link ResultPartition#releaseMemory(int)} on a working partition. +* +* @param resultPartitionType the result partition type to set up +*/ + private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + final int numBuffers = 10; + final NetworkEnvironment network = new NetworkEnvironment( + new NetworkBufferPool(numBuffers, 128), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOManager.IOMode.SYNC, + 0, + 0, + 2, + 8, + true); + final ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); Review comment: I checked these two implementations and they are exactly the same with empty operation, so I think we should open a separate JIRA for unifying current two existing implementations and make it public for outer usage. Considering the mock issue, I have some concerns: 1. Do you think there are any differences between mock and empty operations in real class? 2. There are already many usages of mock in existing tests, do you think it is necessary to polish them in real classes? If so, I am willing to open JIRA for them. :) 3. I think the unit tests are mainly for verifying partial logics or processes, so if the behaviors of some classes are not cared about in the target test, we can mock these objects to make simple. Otherwise we can develop the ITCase for end-to-end verify with all real classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10540: --- Assignee: Shimin Yang > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656201#comment-16656201 ] ASF GitHub Bot commented on FLINK-10491: zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#discussion_r226520720 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -205,6 +216,66 @@ protected void testAddOnPartition(final ResultPartitionType pipelined) } } + @Test + public void testReleaseMemoryOnBlockingPartition() throws Exception { + testReleaseMemory(ResultPartitionType.BLOCKING); + } + + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { + testReleaseMemory(ResultPartitionType.PIPELINED); + } + + /** +* Tests {@link ResultPartition#releaseMemory(int)} on a working partition. +* +* @param resultPartitionType the result partition type to set up +*/ + private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + final int numBuffers = 10; + final NetworkEnvironment network = new NetworkEnvironment( Review comment: Yes, I agree with the option of providing a `NetworkEnvironmentBuilder` nested in `NetworkEnvironment`. But I think it is worth opening a separate JIRA for unifying all the related history usages, because it is not caused by this PR, and this PR would be based on that new JIRA. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deadlock during spilling data in SpillableSubpartition > --- > > Key: FLINK-10491 > URL: https://issues.apache.org/jira/browse/FLINK-10491 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.4, 1.6.1 >Reporter: Piotr Nowojski >Assignee: zhijiang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Originally reported here: > [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E] > Thread dump (from 1.5.3 version) showing two deadlocked threads, because they > are taking two locks in different order: > {noformat} > Thread-1 > "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA > waiting for monitor entry > waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355) > - locked <0x2dfd> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203) > - locked <0x2da5> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.
[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool URL: https://github.com/apache/flink/pull/6809#discussion_r226520720 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ## @@ -205,6 +216,66 @@ protected void testAddOnPartition(final ResultPartitionType pipelined) } } + @Test + public void testReleaseMemoryOnBlockingPartition() throws Exception { + testReleaseMemory(ResultPartitionType.BLOCKING); + } + + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { + testReleaseMemory(ResultPartitionType.PIPELINED); + } + + /** +* Tests {@link ResultPartition#releaseMemory(int)} on a working partition. +* +* @param resultPartitionType the result partition type to set up +*/ + private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + final int numBuffers = 10; + final NetworkEnvironment network = new NetworkEnvironment( Review comment: Yes, I agree with the option of providing a `NetworkEnvironmentBuilder` nested in `NetworkEnvironment`. But I think it is worth opening a separate JIRA for unifying all the related history usages, because it is not caused by this PR, and this PR would be based on that new JIRA. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656200#comment-16656200 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226520617 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: I think it is really necessary to add an ASCII judgment here. I misunderstood what you mean. I thought you were talking about [this issue]( https://github.com/apache/flink/pull/6432#discussion_r205512482). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226520617 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: I think it is really necessary to add an ASCII judgment here. I misunderstood what you mean. I thought you were talking about [this issue]( https://github.com/apache/flink/pull/6432#discussion_r205512482). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656183#comment-16656183 ] ASF GitHub Bot commented on FLINK-10166: leanken commented on issue #6863: [FLINK-10166][table] Replace commons.codec.binary.Base64 with java.util.Base64 URL: https://github.com/apache/flink/pull/6863#issuecomment-431223615 let me fix TestCase build break first. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] leanken commented on issue #6863: [FLINK-10166][table] Replace commons.codec.binary.Base64 with java.util.Base64
leanken commented on issue #6863: [FLINK-10166][table] Replace commons.codec.binary.Base64 with java.util.Base64 URL: https://github.com/apache/flink/pull/6863#issuecomment-431223615 let me fix TestCase build break first. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed
XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226516362 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I have two questions about this commit, as follows: For the first suggestion, I feel that users often cannot know the encoding of the file accurately. For example: file encoding `UTF-16LE`, with bom header, user-specified encoding `UTF-16BE` will report an error. And there is bom UTF with bom encoding I believe will be the majority. So I think it is necessary to do the bom code detection first, which is better for the user experience. For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. This method cannot currently seek to position 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656173#comment-16656173 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226516362 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I have two questions about this commit, as follows: For the first suggestion, I feel that users often cannot know the encoding of the file accurately. For example: file encoding `UTF-16LE`, with bom header, user-specified encoding `UTF-16BE` will report an error. And there is bom UTF with bom encoding I believe will be the majority. So I think it is necessary to do the bom code detection first, which is better for the user experience. For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. This method cannot currently seek to position 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed
XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226516362 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I have two questions about this commit, as follows: For the first suggestion, I feel that users often cannot know the encoding of the file accurately. For example: file encoding `UTF-16LE`, with bom header, user-specified encoding `UTF-16BE` will report an error. And there is bom UTF encoding I believe will be the majority. So I think it is necessary to do the bom code detection first, which is better for the user experience. For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. This method cannot currently seek to position 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656161#comment-16656161 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226516362 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I have two questions about this commit, as follows: For the first suggestion, I feel that users often cannot know the encoding of the file accurately. For example: file encoding `UTF-16LE`, with bom header, user-specified encoding `UTF-16BE` will report an error. And there is bom UTF encoding I believe will be the majority. So I think it is necessary to do the bom code detection first, which is better for the user experience. For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. This method cannot currently seek to position 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656159#comment-16656159 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226515985 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: OK, I can return `null`. In the Java world, Integer is an object, and returning null is fine from a grammatical and semantic point of view. However, when we use ascii('') as input to other operators, such as xxxFunction(ascii('')). The subsequent effects of null and 0 are completely different. Null may produce NPE or other behavior, while 0 is different. If you have already considered this, then I have no problem here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226515985 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: OK, I can return `null`. In the Java world, Integer is an object, and returning null is fine from a grammatical and semantic point of view. However, when we use ascii('') as input to other operators, such as xxxFunction(ascii('')). The subsequent effects of null and 0 are completely different. Null may produce NPE or other behavior, while 0 is different. If you have already considered this, then I have no problem here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656144#comment-16656144 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226513490 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: OK, will change it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226513490 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: OK, will change it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656141#comment-16656141 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581 @alexeyt820 I have moved these two issues into FLINK-10598 as sub issues. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656140#comment-16656140 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581 @alexeyt820 I have moved these two issues into FLINK-10598 as a sub issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka
yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581 @alexeyt820 I have moved these two issues into FLINK-10598 as sub issues. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka
yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581 @alexeyt820 I have moved these two issues into FLINK-10598 as a sub issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656137#comment-16656137 ] vinoyang commented on FLINK-8500: - Hi, if the modern kafka connector also has this issue, Shall we remove *Kafka010Fetcher* from the title of the issue? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-8500: Issue Type: Sub-task (was: Improvement) Parent: FLINK-10598 > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-8354: Issue Type: Sub-task (was: Bug) Parent: FLINK-10598 > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656102#comment-16656102 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226506688 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: Yes, the results are "coincidentally" to be null, but I still suggest to explicitly return a `null` value instead of a normal result. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226506688 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: Yes, the results are "coincidentally" to be null, but I still suggest to explicitly return a `null` value instead of a normal result. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656099#comment-16656099 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226506284 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: Could you explain more about this behavior? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656097#comment-16656097 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226506162 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f36.chr(), + "f36.chr()", + "CHR(f36)", + "A") + +testAllApis( + 'f37.chr(), + "f37.chr()", + "CHR(f37)", + "a") + +testAllApis( + 'f38.chr(), + "f38.chr()", + "CHR(f38)", + "ÿ") + +testAllApis( Review comment: This is duplicated with a test case above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226506284 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: Could you explain more about this behavior? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r226506162 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f36.chr(), + "f36.chr()", + "CHR(f36)", + "A") + +testAllApis( + 'f37.chr(), + "f37.chr()", + "CHR(f37)", + "a") + +testAllApis( + 'f38.chr(), + "f38.chr()", + "CHR(f38)", + "ÿ") + +testAllApis( Review comment: This is duplicated with a test case above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655692#comment-16655692 ] Fabian Hueske commented on FLINK-10566: --- This issue only affects DataSet programs. The DataStream API does not optimize the plans. To be honest, this might be a major issue. The optimizer has not been developed for quite some time (besides some bugfixes) and changing the plan enumeration logic is not really easy. One thing that might work, is to run the optimizer for subplans and connect the optimal subplans. However, we would need to identify the edges at which we want to split the plan. One challenge here is that the DataSet API allows for branching and merging plans (in contrast to most relational optimizers). What you also could try to do is to pass optimizer hints yourself. That would reduce the search space because the execution strategies would be be fixed. > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4 >Reporter: Robert Bradshaw >Priority: Major > Attachments: chart.png > > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet input = env.fromElements("a", "b", "c"); > DataSet stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet analyze(DataSet input, > DataSet stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction() { > @Override > public void open(Configuration parameters) throws Exception { > Collection broadcastSet = > getRuntimeContext().getBroadcastVariable("stats"); > } > @Override > public String map(String value) throws Exception { > return value; > } > }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); > } > DataSet branch = input >.map(s -> new Tuple2(0, s + > ii)) >.groupBy(0) >.minBy(1) >.map(kv -> kv.f1); > if (stats == null) { > stats = branch; > } else { > stats = stats.union(branch); > } > } > return stats.map(s -> "(" + s + ").stats"); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655683#comment-16655683 ] Maximilian Michels commented on FLINK-10566: Just to clarify, this happens during Plan creation which is specific to the batch execution. Afterwards, the Plan is converted into a JobGraph which is send to the JobMaster for scheduling. [~robertwb] Have you run a similar experiment for the DataStream API? Asking because streaming has a different way of creating the JobGraph. [~fhueske] or [~till.rohrmann] Do you have an idea how we could fix this? This blocks us from running TFX pipelines on Flink. > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4 >Reporter: Robert Bradshaw >Priority: Major > Attachments: chart.png > > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet input = env.fromElements("a", "b", "c"); > DataSet stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet analyze(DataSet input, > DataSet stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction() { > @Override > public void open(Configuration parameters) throws Exception { > Collection broadcastSet = > getRuntimeContext().getBroadcastVariable("stats"); > } > @Override > public String map(String value) throws Exception { > return value; > } > }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); > } > DataSet branch = input >.map(s -> new Tuple2(0, s + > ii)) >.groupBy(0) >.minBy(1) >.map(kv -> kv.f1); > if (stats == null) { > stats = branch; > } else { > stats = stats.union(branch); > } > } > return stats.map(s -> "(" + s + ").stats"); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655681#comment-16655681 ] Maximilian Michels commented on FLINK-10566: Here's a thread dump: {noformat} at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281) at org.apache.flink.api.common.
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655663#comment-16655663 ] ASF GitHub Bot commented on FLINK-9697: --- alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-431100016 @yanghua, FLINK-8354 (headers) and FLINK-8500 (timestamp), but seems I don't have permissions to change anything in these Jira issues This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka
alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-431100016 @yanghua, FLINK-8354 (headers) and FLINK-8500 (timestamp), but seems I don't have permissions to change anything in these Jira issues This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
tillrohrmann commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#discussion_r226396417 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException { /** * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). */ + @Ignore @Test(timeout = 10) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); Review comment: I think the problem is that there was another precondition violated which caused a `MultipleFailureException` to be thrown and due to that the `AssumptionException` was not properly treated as ignoring the respective test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655636#comment-16655636 ] ASF GitHub Bot commented on FLINK-10527: tillrohrmann commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#discussion_r226396417 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException { /** * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). */ + @Ignore @Test(timeout = 10) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); Review comment: I think the problem is that there was another precondition violated which caused a `MultipleFailureException` to be thrown and due to that the `AssumptionException` was not properly treated as ignoring the respective test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
tillrohrmann commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431093823 At the moment `rest.address` is effectively `dispatcher.rest.address`. The problem I see currently is that other components might already depend on `RestOption#ADDRESS`. If we now introduce a `RestOption#DISPATCHER_ADDRESS` with the fallback to `RestOption#ADDRESS` and `JobManagerOptions#ADDRESS` and remove the latter as a deprecated key from the former, then we break setups. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655627#comment-16655627 ] ASF GitHub Bot commented on FLINK-10436: tillrohrmann commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431093823 At the moment `rest.address` is effectively `dispatcher.rest.address`. The problem I see currently is that other components might already depend on `RestOption#ADDRESS`. If we now introduce a `RestOption#DISPATCHER_ADDRESS` with the fallback to `RestOption#ADDRESS` and `JobManagerOptions#ADDRESS` and remove the latter as a deprecated key from the former, then we break setups. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests
[ https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655620#comment-16655620 ] TisonKun edited comment on FLINK-10569 at 10/18/18 5:22 PM: I find this topic is more complex than I ever thought. We actually have two version of slot implementation and current {{ExecutionGraph}} tests heavily depend on legacy slot implementation. Maybe we should separate this thread into stepwise issues decouples existing valid tests with legacy slot implementation(as well as Scheduler and Instance). cc [~till.rohrmann]. was (Author: tison): I find this topic is more complex than I ever thought. We actually have two version of slot implementation and current {{ExecutionGraph}} tests heavily depend on legacy slot implementation. Maybe we should separated this thread into stepwise issues decouples existing valid tests with legacy slot implementation(as well as Scheduler and Instance). cc [~till.rohrmann]. > Clean up uses of Scheduler and Instance in valid tests > -- > > Key: FLINK-10569 > URL: https://issues.apache.org/jira/browse/FLINK-10569 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.7.0 > > > Legacy class {{Scheduler}} and {{Instance}} are still used in some valid > tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 > schedule mode. The best way I can find is use {{SimpleSlotProvider}}. > Note that we need not to remove all use points among all files since most of > them stay in legacy codebase like {{JobManager.scala}} and would be removed > later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests
[ https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655620#comment-16655620 ] TisonKun edited comment on FLINK-10569 at 10/18/18 5:22 PM: I find this topic is more complex than I ever thought. We actually have two version of slot implementation and current {{ExecutionGraph}} tests heavily depend on legacy slot implementation. Maybe we should separated this thread into stepwise issues decouples existing valid tests with legacy slot implementation(as well as Scheduler and Instance). cc [~till.rohrmann]. was (Author: tison): I find this topic is more complex than I ever thought. We actually have two version of slot implementation and current {{ExecutionGraph}} related tests heavily depend on legacy slot implementation. Maybe we should separated this thread into stepwise issues decouples existing valid tests with legacy slot implementation(as well as Scheduler and Instance). cc [~till.rohrmann]. > Clean up uses of Scheduler and Instance in valid tests > -- > > Key: FLINK-10569 > URL: https://issues.apache.org/jira/browse/FLINK-10569 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.7.0 > > > Legacy class {{Scheduler}} and {{Instance}} are still used in some valid > tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 > schedule mode. The best way I can find is use {{SimpleSlotProvider}}. > Note that we need not to remove all use points among all files since most of > them stay in legacy codebase like {{JobManager.scala}} and would be removed > later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests
[ https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655620#comment-16655620 ] TisonKun commented on FLINK-10569: -- I find this topic is more complex than I ever thought. We actually have two version of slot implementation and current {{ExecutionGraph}} related tests heavily depend on legacy slot implementation. Maybe we should separated this thread into stepwise issues decouples existing valid tests with legacy slot implementation(as well as Scheduler and Instance). cc [~till.rohrmann]. > Clean up uses of Scheduler and Instance in valid tests > -- > > Key: FLINK-10569 > URL: https://issues.apache.org/jira/browse/FLINK-10569 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.7.0 > > > Legacy class {{Scheduler}} and {{Instance}} are still used in some valid > tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 > schedule mode. The best way I can find is use {{SimpleSlotProvider}}. > Note that we need not to remove all use points among all files since most of > them stay in legacy codebase like {{JobManager.scala}} and would be removed > later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10251) Handle oversized response messages in AkkaRpcActor
[ https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655615#comment-16655615 ] ASF GitHub Bot commented on FLINK-10251: tillrohrmann commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor URL: https://github.com/apache/flink/pull/6876#discussion_r226393468 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ## @@ -108,9 +111,11 @@ private volatile boolean stopped; - public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { + public AkkaRpcService(final ActorSystem actorSystem, final Configuration configuration) { Review comment: Yes let's introduce an `AkkaRpcServiceConfiguration`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized response messages in AkkaRpcActor > -- > > Key: FLINK-10251 > URL: https://issues.apache.org/jira/browse/FLINK-10251 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{AkkaRpcActor}} should check whether an RPC response which is sent to a > remote sender does not exceed the maximum framesize of the underlying > {{ActorSystem}}. If this is the case we should fail fast instead. We can > achieve this by serializing the response and sending the serialized byte > array. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor
tillrohrmann commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor URL: https://github.com/apache/flink/pull/6876#discussion_r226393468 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ## @@ -108,9 +111,11 @@ private volatile boolean stopped; - public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { + public AkkaRpcService(final ActorSystem actorSystem, final Configuration configuration) { Review comment: Yes let's introduce an `AkkaRpcServiceConfiguration`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10602) Run MetricFetcher in metrics ActorSystem
[ https://issues.apache.org/jira/browse/FLINK-10602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655600#comment-16655600 ] ASF GitHub Bot commented on FLINK-10602: tillrohrmann closed pull request #6877: [FLINK-10602] Use metric's ActorSystem in MetricFetcher URL: https://github.com/apache/flink/pull/6877 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 35fad322ef9..150f17fa04c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; @@ -219,6 +220,9 @@ private void runCluster(Configuration configuration) throws Exception { heartbeatServices, metricRegistry, archivedExecutionGraphStore, + new AkkaQueryServiceRetriever( + metricQueryServiceActorSystem, + Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), this); clusterComponent.getShutDownFuture().whenComplete( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index 6d557d0815f..354245d29ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -46,16 +45,14 @@ import org.apache.flink.runtime.rest.RestEndpointFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +99,7 @@ public AbstractDispatcherResourceManagerComponentFactory( HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, + MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; @@ -130,10 +128,6 @@ public AbstractDispatcherResourceManagerComponentFactory( 10, Time.milliseconds(50L)); - // TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint - final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); - final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); -
[jira] [Resolved] (FLINK-10602) Run MetricFetcher in metrics ActorSystem
[ https://issues.apache.org/jira/browse/FLINK-10602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10602. --- Resolution: Fixed Fixed via https://github.com/apache/flink/commit/f6a7100507c4d6eda25c10618f0ad028ffef32d8 > Run MetricFetcher in metrics ActorSystem > > > Key: FLINK-10602 > URL: https://issues.apache.org/jira/browse/FLINK-10602 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After letting the {{MetricQueryService}} run into a separate {{ActorSystem}}, > we should also let the {{MetricFetcher}} use the same {{ActorSystem}} to > fetch the metrics. That way it will be possible to configure the metric > {{ActorSystem}} different from the {{RpcService}} {{ActorSystem}} (e.g. > different frame size). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #6877: [FLINK-10602] Use metric's ActorSystem in MetricFetcher
tillrohrmann closed pull request #6877: [FLINK-10602] Use metric's ActorSystem in MetricFetcher URL: https://github.com/apache/flink/pull/6877 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 35fad322ef9..150f17fa04c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; @@ -219,6 +220,9 @@ private void runCluster(Configuration configuration) throws Exception { heartbeatServices, metricRegistry, archivedExecutionGraphStore, + new AkkaQueryServiceRetriever( + metricQueryServiceActorSystem, + Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), this); clusterComponent.getShutDownFuture().whenComplete( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index 6d557d0815f..354245d29ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -46,16 +45,14 @@ import org.apache.flink.runtime.rest.RestEndpointFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; -import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import akka.actor.ActorSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +99,7 @@ public AbstractDispatcherResourceManagerComponentFactory( HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, + MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; @@ -130,10 +128,6 @@ public AbstractDispatcherResourceManagerComponentFactory( 10, Time.milliseconds(50L)); - // TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint - final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); - final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); - webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, @@ -143,7 +137,7 @@ public AbstractDispatcherResourceManagerCompone
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655598#comment-16655598 ] ASF GitHub Bot commented on FLINK-10436: zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431087755 I understood what the fallback is supposed to do, but it doesn't make sense that something generic like `rest.address` uses the very specific `jobmanager.rpc.address` as a fall-back. Falling back to a _more specific_ option is conceptually not sound. If you want to make the fallback behavior re-usable I'd suggest a dedicated `dispatcher.rest.port` option with fall-backs to `rest.port` and `jobmanager.rpc.address` which you then use to configure the options that the `RestEndpoint` expects. This way you retain the convenient fallback behavior without coupling the rest configuration to any other component. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431087755 I understood what the fallback is supposed to do, but it doesn't make sense that something generic like `rest.address` uses the very specific `jobmanager.rpc.address` as a fall-back. Falling back to a _more specific_ option is conceptually not sound. If you want to make the fallback behavior re-usable I'd suggest a dedicated `dispatcher.rest.port` option with fall-backs to `rest.port` and `jobmanager.rpc.address` which you then use to configure the options that the `RestEndpoint` expects. This way you retain the convenient fallback behavior without coupling the rest configuration to any other component. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException
[ https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1663#comment-1663 ] ASF GitHub Bot commented on FLINK-10309: GJL commented on issue #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations URL: https://github.com/apache/flink/pull/6785#issuecomment-431078873 @zentol Thanks for reviewing. I will keep the refactoring in a separate commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cancel flink job occurs java.net.ConnectException > - > > Key: FLINK-10309 > URL: https://issues.apache.org/jira/browse/FLINK-10309 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, REST >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: vinoyang >Assignee: Gary Yao >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The problem occurs when using the Yarn per-job detached mode. Trying to > cancel with savepoint fails with the following exception before being able to > retrieve the savepoint path: > exception stack trace : > {code:java} > org.apache.flink.util.FlinkException: Could not cancel job . > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) > at > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) > ... 6 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.
[GitHub] GJL commented on issue #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations
GJL commented on issue #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations URL: https://github.com/apache/flink/pull/6785#issuecomment-431078873 @zentol Thanks for reviewing. I will keep the refactoring in a separate commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10555) Port AkkaSslITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655539#comment-16655539 ] ASF GitHub Bot commented on FLINK-10555: TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base URL: https://github.com/apache/flink/pull/6849#issuecomment-431075922 @tillrohrmann yes I think those are covered by the mentioned existing tests. Moving negative tests to `BlobServerSslTest` as suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AkkaSslITCase to new code base > --- > > Key: FLINK-10555 > URL: https://issues.apache.org/jira/browse/FLINK-10555 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base
TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base URL: https://github.com/apache/flink/pull/6849#issuecomment-431075922 @tillrohrmann yes I think those are covered by the mentioned existing tests. Moving negative tests to `BlobServerSslTest` as suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme
[ https://issues.apache.org/jira/browse/FLINK-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655529#comment-16655529 ] ASF GitHub Bot commented on FLINK-10563: aljoscha commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme URL: https://github.com/apache/flink/pull/6855#issuecomment-431074098 will update both, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Expose shaded Presto S3 filesystem under "s3p" scheme > - > > Key: FLINK-10563 > URL: https://issues.apache.org/jira/browse/FLINK-10563 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 > filesystem at the same time. If we exposed the presto filesystem under an > additional scheme we enable using both at the same time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme
aljoscha commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme URL: https://github.com/apache/flink/pull/6855#issuecomment-431074098 will update both, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655509#comment-16655509 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#issuecomment-431070571 @xccui Any more review comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#issuecomment-431070571 @xccui Any more review comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655493#comment-16655493 ] ASF GitHub Bot commented on FLINK-10384: pnowojski closed pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 37171a2079f..e47b31772b3 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -1197,6 +1197,18 @@ SIN(numeric) + + +{% highlight text %} +SINH(numeric) +{% endhighlight %} + + +Returns the hyperbolic sine of numeric. +The return type is DOUBLE. + + + {% highlight text %} @@ -1656,6 +1668,18 @@ NUMERIC.sin() + + +{% highlight java %} +NUMERIC.sinh() +{% endhighlight %} + + +Returns the hyperbolic sine of NUMERIC. +The return type is DOUBLE. + + + {% highlight java %} @@ -2116,6 +2140,18 @@ NUMERIC.sin() + + +{% highlight scala %} +NUMERIC.sinh() +{% endhighlight %} + + +Returns the hyperbolic sine of NUMERIC. +The return type is DOUBLE. + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 7c585674b54..f9fb93cb7fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -357,6 +357,11 @@ trait ImplicitExpressionOperations { */ def floor() = Floor(expr) + /** +* Calculates the hyperbolic sine of a given value. +*/ + def sinh() = Sinh(expr) + /** * Calculates the smallest integer greater than or equal to a given number. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 1ae6e39e073..8abe55db8d5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -84,6 +84,9 @@ object BuiltInMethods { val ACOS = Types.lookupMethod(classOf[Math], "acos", classOf[Double]) val ACOS_DEC = Types.lookupMethod(classOf[SqlFunctions], "acos", classOf[JBigDecimal]) + val SINH = Types.lookupMethod(classOf[Math], "sinh", classOf[Double]) + val SINH_DEC = Types.lookupMethod(classOf[ScalarFunctions], "sinh", classOf[JBigDecimal]) + val ATAN = Types.lookupMethod(classOf[Math], "atan", classOf[Double]) val ATAN_DEC = Types.lookupMethod(classOf[SqlFunctions], "atan", classOf[JBigDecimal]) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 9a6aeb15b3b..c7070400c9f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -241,6 +241,18 @@ object FunctionGenerator { DOUBLE_TYPE_INFO, BuiltInMethods.LN) + addSqlFunctionMethod( +SINH, +Seq(DOUBLE_TYPE_INFO), +DOUBLE_TYPE_INFO, +BuiltInMethods.SINH) + + addSqlFunctionMethod( +SINH, +Seq(BIG_DEC_TYPE_INFO), +DOUBLE_TYPE_INFO, +BuiltInMethods.SINH_DEC) + addSqlFunctionMethod( EXP, Seq(DOUBLE_TYPE_INFO), diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala index 97e7190c0eb..05539deb711 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala @@ -165,6 +165,20 @@ case class Power(left: Expression, right: Expression) extends BinaryExpression w } } +case class S
[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655492#comment-16655492 ] ASF GitHub Bot commented on FLINK-10384: pnowojski commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#issuecomment-431066934 Thanks @yanghua :) Merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Sinh math function supported in Table API and SQL > - > > Key: FLINK-10384 > URL: https://issues.apache.org/jira/browse/FLINK-10384 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > like FLINK-10340 for adding Cosh math function -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski closed pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL
pnowojski closed pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 37171a2079f..e47b31772b3 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -1197,6 +1197,18 @@ SIN(numeric) + + +{% highlight text %} +SINH(numeric) +{% endhighlight %} + + +Returns the hyperbolic sine of numeric. +The return type is DOUBLE. + + + {% highlight text %} @@ -1656,6 +1668,18 @@ NUMERIC.sin() + + +{% highlight java %} +NUMERIC.sinh() +{% endhighlight %} + + +Returns the hyperbolic sine of NUMERIC. +The return type is DOUBLE. + + + {% highlight java %} @@ -2116,6 +2140,18 @@ NUMERIC.sin() + + +{% highlight scala %} +NUMERIC.sinh() +{% endhighlight %} + + +Returns the hyperbolic sine of NUMERIC. +The return type is DOUBLE. + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 7c585674b54..f9fb93cb7fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -357,6 +357,11 @@ trait ImplicitExpressionOperations { */ def floor() = Floor(expr) + /** +* Calculates the hyperbolic sine of a given value. +*/ + def sinh() = Sinh(expr) + /** * Calculates the smallest integer greater than or equal to a given number. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 1ae6e39e073..8abe55db8d5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -84,6 +84,9 @@ object BuiltInMethods { val ACOS = Types.lookupMethod(classOf[Math], "acos", classOf[Double]) val ACOS_DEC = Types.lookupMethod(classOf[SqlFunctions], "acos", classOf[JBigDecimal]) + val SINH = Types.lookupMethod(classOf[Math], "sinh", classOf[Double]) + val SINH_DEC = Types.lookupMethod(classOf[ScalarFunctions], "sinh", classOf[JBigDecimal]) + val ATAN = Types.lookupMethod(classOf[Math], "atan", classOf[Double]) val ATAN_DEC = Types.lookupMethod(classOf[SqlFunctions], "atan", classOf[JBigDecimal]) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 9a6aeb15b3b..c7070400c9f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -241,6 +241,18 @@ object FunctionGenerator { DOUBLE_TYPE_INFO, BuiltInMethods.LN) + addSqlFunctionMethod( +SINH, +Seq(DOUBLE_TYPE_INFO), +DOUBLE_TYPE_INFO, +BuiltInMethods.SINH) + + addSqlFunctionMethod( +SINH, +Seq(BIG_DEC_TYPE_INFO), +DOUBLE_TYPE_INFO, +BuiltInMethods.SINH_DEC) + addSqlFunctionMethod( EXP, Seq(DOUBLE_TYPE_INFO), diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala index 97e7190c0eb..05539deb711 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala @@ -165,6 +165,20 @@ case class Power(left: Expression, right: Expression) extends BinaryExpression w } } +case class Sinh(child: Expression) extends UnaryExpression { + + override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO; + + override private[flink] def validateInput(): ValidationResult = +TypeCheckUtils.assertNumericExpr(child.resultType, "
[GitHub] pnowojski commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL
pnowojski commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#issuecomment-431066934 Thanks @yanghua :) Merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10251) Handle oversized response messages in AkkaRpcActor
[ https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655486#comment-16655486 ] ASF GitHub Bot commented on FLINK-10251: yanghua commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor URL: https://github.com/apache/flink/pull/6876#discussion_r226366888 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ## @@ -108,9 +111,11 @@ private volatile boolean stopped; - public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { + public AkkaRpcService(final ActorSystem actorSystem, final Configuration configuration) { Review comment: Sorry, I forgot to refactor this code segment : ``` if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) { maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH); } else { // only local communication maximumFramesize = Long.MAX_VALUE; } ``` to use ``` configuration.getString(AkkaOptions.FRAMESIZE); ``` (this suggestion from @zentol) If I would do this, I think it's better to replace scattered config items into one Configuration object so that we can encapsulate change? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized response messages in AkkaRpcActor > -- > > Key: FLINK-10251 > URL: https://issues.apache.org/jira/browse/FLINK-10251 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{AkkaRpcActor}} should check whether an RPC response which is sent to a > remote sender does not exceed the maximum framesize of the underlying > {{ActorSystem}}. If this is the case we should fail fast instead. We can > achieve this by serializing the response and sending the serialized byte > array. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor
yanghua commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor URL: https://github.com/apache/flink/pull/6876#discussion_r226366888 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ## @@ -108,9 +111,11 @@ private volatile boolean stopped; - public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { + public AkkaRpcService(final ActorSystem actorSystem, final Configuration configuration) { Review comment: Sorry, I forgot to refactor this code segment : ``` if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) { maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH); } else { // only local communication maximumFramesize = Long.MAX_VALUE; } ``` to use ``` configuration.getString(AkkaOptions.FRAMESIZE); ``` (this suggestion from @zentol) If I would do this, I think it's better to replace scattered config items into one Configuration object so that we can encapsulate change? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655481#comment-16655481 ] ASF GitHub Bot commented on FLINK-10436: TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226366382 ## File path: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ## @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) { if (this.confData.containsKey(configOption.key())) { return true; } - else if (configOption.hasDeprecatedKeys()) { - // try the deprecated keys - for (String deprecatedKey : configOption.deprecatedKeys()) { - if (this.confData.containsKey(deprecatedKey)) { - LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", - deprecatedKey, configOption.key()); + else if (configOption.hasFallbackKeys()) { + // try the fallback keys + for (FallbackKey fallbackKey : configOption.fallbackKeys()) { + if (this.confData.containsKey(fallbackKey.getKey())) { + if (fallbackKey.isDeprecated()) { + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + fallbackKey.getKey(), configOption.key()); Review comment: Oh I see the repeat, would do as suggested, sorry :P This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226366382 ## File path: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ## @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) { if (this.confData.containsKey(configOption.key())) { return true; } - else if (configOption.hasDeprecatedKeys()) { - // try the deprecated keys - for (String deprecatedKey : configOption.deprecatedKeys()) { - if (this.confData.containsKey(deprecatedKey)) { - LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", - deprecatedKey, configOption.key()); + else if (configOption.hasFallbackKeys()) { + // try the fallback keys + for (FallbackKey fallbackKey : configOption.fallbackKeys()) { + if (this.confData.containsKey(fallbackKey.getKey())) { + if (fallbackKey.isDeprecated()) { + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + fallbackKey.getKey(), configOption.key()); Review comment: Oh I see the repeat, would do as suggested, sorry :P This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655473#comment-16655473 ] ASF GitHub Bot commented on FLINK-10436: TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226364750 ## File path: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ## @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) { if (this.confData.containsKey(configOption.key())) { return true; } - else if (configOption.hasDeprecatedKeys()) { - // try the deprecated keys - for (String deprecatedKey : configOption.deprecatedKeys()) { - if (this.confData.containsKey(deprecatedKey)) { - LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", - deprecatedKey, configOption.key()); + else if (configOption.hasFallbackKeys()) { + // try the fallback keys + for (FallbackKey fallbackKey : configOption.fallbackKeys()) { + if (this.confData.containsKey(fallbackKey.getKey())) { + if (fallbackKey.isDeprecated()) { + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + fallbackKey.getKey(), configOption.key()); Review comment: does it mean to extract the LOG statement to one method like below? ```java private void warningDeprecated(...) { LOG.warn(...); } ``` I wonder what value it provide or just style convenience to follow. I don't find another code block with a method just logging one statement. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226364750 ## File path: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ## @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) { if (this.confData.containsKey(configOption.key())) { return true; } - else if (configOption.hasDeprecatedKeys()) { - // try the deprecated keys - for (String deprecatedKey : configOption.deprecatedKeys()) { - if (this.confData.containsKey(deprecatedKey)) { - LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", - deprecatedKey, configOption.key()); + else if (configOption.hasFallbackKeys()) { + // try the fallback keys + for (FallbackKey fallbackKey : configOption.fallbackKeys()) { + if (this.confData.containsKey(fallbackKey.getKey())) { + if (fallbackKey.isDeprecated()) { + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + fallbackKey.getKey(), configOption.key()); Review comment: does it mean to extract the LOG statement to one method like below? ```java private void warningDeprecated(...) { LOG.warn(...); } ``` I wonder what value it provide or just style convenience to follow. I don't find another code block with a method just logging one statement. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655468#comment-16655468 ] ASF GitHub Bot commented on FLINK-10436: TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226363659 ## File path: flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A key with FallbackKeys will fall back to the FallbackKeys if it itself is not configured. + */ +public class FallbackKey { + + private final String key; + + private final boolean isDeprecated; + + public String getKey() { + return key; + } + + public boolean isDeprecated() { + return isDeprecated; + } + + FallbackKey(String key, boolean isDeprecated) { Review comment: sound great. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655470#comment-16655470 ] ASF GitHub Bot commented on FLINK-10436: TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226363901 ## File path: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ## @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) { if (this.confData.containsKey(configOption.key())) { return true; } - else if (configOption.hasDeprecatedKeys()) { - // try the deprecated keys - for (String deprecatedKey : configOption.deprecatedKeys()) { - if (this.confData.containsKey(deprecatedKey)) { - LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", - deprecatedKey, configOption.key()); + else if (configOption.hasFallbackKeys()) { + // try the fallback keys + for (FallbackKey fallbackKey : configOption.fallbackKeys()) { + if (this.confData.containsKey(fallbackKey.getKey())) { + if (fallbackKey.isDeprecated()) { Review comment: agree This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226363901 ## File path: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ## @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) { if (this.confData.containsKey(configOption.key())) { return true; } - else if (configOption.hasDeprecatedKeys()) { - // try the deprecated keys - for (String deprecatedKey : configOption.deprecatedKeys()) { - if (this.confData.containsKey(deprecatedKey)) { - LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", - deprecatedKey, configOption.key()); + else if (configOption.hasFallbackKeys()) { + // try the fallback keys + for (FallbackKey fallbackKey : configOption.fallbackKeys()) { + if (this.confData.containsKey(fallbackKey.getKey())) { + if (fallbackKey.isDeprecated()) { Review comment: agree This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#discussion_r226363659 ## File path: flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A key with FallbackKeys will fall back to the FallbackKeys if it itself is not configured. + */ +public class FallbackKey { + + private final String key; + + private final boolean isDeprecated; + + public String getKey() { + return key; + } + + public boolean isDeprecated() { + return isDeprecated; + } + + FallbackKey(String key, boolean isDeprecated) { Review comment: sound great. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655451#comment-16655451 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226358701 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, ent
[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address
[ https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655450#comment-16655450 ] ASF GitHub Bot commented on FLINK-10436: TisonKun commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431058218 The idea we need a fallback key is because keyA and keyB(with keyA fallback to keyB) are conceptually not the same thing or keyB succeed keyA. It just, in the context, we use the configured keyB as a default(fallback) value to keyA. And I agree that if the key's semantic changes, we should update the fallback dependency on demand. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Example config uses deprecated key jobmanager.rpc.address > - > > Key: FLINK-10436 > URL: https://issues.apache.org/jira/browse/FLINK-10436 > Project: Flink > Issue Type: Sub-task > Components: Startup Shell Scripts >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The example {{flink-conf.yaml}} shipped as part of the Flink distribution > (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml) > has the following entry: > {code} > jobmanager.rpc.address: localhost > {code} > When using this key, the following deprecation warning is logged. > {code} > 2018-09-26 12:01:46,608 WARN org.apache.flink.configuration.Configuration > - Config uses deprecated configuration key > 'jobmanager.rpc.address' instead of proper key 'rest.address' > {code} > The example config should not use deprecated config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges
yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226358701 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + if (buffer.length() > maximumFramesize) { Review comment: OK, I will try to separate the byte array
[GitHub] TisonKun commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
TisonKun commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys URL: https://github.com/apache/flink/pull/6872#issuecomment-431058218 The idea we need a fallback key is because keyA and keyB(with keyA fallback to keyB) are conceptually not the same thing or keyB succeed keyA. It just, in the context, we use the configured keyB as a default(fallback) value to keyA. And I agree that if the key's semantic changes, we should update the fallback dependency on demand. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655447#comment-16655447 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#discussion_r226357341 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException { /** * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). */ + @Ignore @Test(timeout = 10) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); Review comment: It's here : https://api.travis-ci.org/v3/job/439612829/log.txt . It seems there is other reason, but this error message logged in the log file. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)