[jira] [Commented] (FLINK-10474) Don't translate IN to JOIN with VALUES for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642878#comment-16642878 ] Piotr Nowojski commented on FLINK-10474: Sorry for jumping a bit late to the discussion, but I would like to point out couple of drawbacks of the 1. approach: # it's less general. 2nd option would/could cover more cases like: IN queries with bounded table (not values), JOINS with bounded tables (or values). JOINS with bounded are something that is being asked by the users and is something that we would like to have. If we go now with 1. approach, it will be a wasted effort after implementing bounded JOINS. # It adds complexity. Despite it being maybe easier to implement, it doesn't add new features to the Flink, while increasing code complexity by adding some code to handle only a special case. # It will complicate planning logic and will more diverge streaming plans from the batch. This again will rise the complexity of the project (more moving parts, more things one have to consider when analysing planning results). > Don't translate IN to JOIN with VALUES for streaming queries > > > Key: FLINK-10474 > URL: https://issues.apache.org/jira/browse/FLINK-10474 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > IN clauses are translated to JOIN with VALUES if the number of elements in > the IN clause exceeds a certain threshold. This should not be done, because a > streaming join is very heavy and materializes both inputs (which is fine for > the VALUES) input but not for the other. > There are two ways to solve this: > # don't translate IN to a JOIN at all > # translate it to a JOIN but have a special join strategy if one input is > bound and final (non-updating) > Option 1. should be easy to do, option 2. requires much more effort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642875#comment-16642875 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223575788 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -115,8 +115,12 @@ public boolean isRegisteredAsAvailable() { @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)! - return hasBuffersAvailable() && - (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); + return hasBuffersAvailable() && !isBlocked(); + } + + @Override + public boolean isBlocked() { Review comment: I think the `isBlocked` naming can not indicate the specific semantics. How about changing to `isCreditsAvailable()` directly? To do so, the `isAvailable()` breaks into `buffersAvailable()` and `creditsAvailable()`, and the following private method `isAvailable(BufferAndBacklog bufferAndBacklog)` may also reuse `isCreditsAvailable()` . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223575788 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -115,8 +115,12 @@ public boolean isRegisteredAsAvailable() { @Override public boolean isAvailable() { // BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)! - return hasBuffersAvailable() && - (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); + return hasBuffersAvailable() && !isBlocked(); + } + + @Override + public boolean isBlocked() { Review comment: I think the `isBlocked` naming can not indicate the specific semantics. How about changing to `isCreditsAvailable()` directly? To do so, the `isAvailable()` breaks into `buffersAvailable()` and `creditsAvailable()`, and the following private method `isAvailable(BufferAndBacklog bufferAndBacklog)` may also reuse `isCreditsAvailable()` . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642866#comment-16642866 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223574052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -119,6 +141,22 @@ private void enqueueAvailableReader(final NetworkSequenceViewReader reader) thro } } + private void flushReaders(long flushTimeout) throws Exception { + List readersToFlush = periodicFlushes.getReaders(flushTimeout); + + boolean wasEmpty = availableReaders.isEmpty(); + + for (NetworkSequenceViewReader reader : readersToFlush) { + if (!reader.isRegisteredAsAvailable() && !reader.isBlocked()) { Review comment: I think the flush operation indicates if this reader has both unfinished `BufferConsumer` and credits, we still want to transport this buffer to reduce latency. So the conditions of available reader should cover both available buffers and available credits. But the current conditions only confirm the reader has available credits to insert into queue. When we poll this reader from the queue and get next buffer to find null, it seems not make sense to register available reader here. So is it reasonable to adjust the conditions here to confirm this reader has both credits and unfinished buffers? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223574052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -119,6 +141,22 @@ private void enqueueAvailableReader(final NetworkSequenceViewReader reader) thro } } + private void flushReaders(long flushTimeout) throws Exception { + List readersToFlush = periodicFlushes.getReaders(flushTimeout); + + boolean wasEmpty = availableReaders.isEmpty(); + + for (NetworkSequenceViewReader reader : readersToFlush) { + if (!reader.isRegisteredAsAvailable() && !reader.isBlocked()) { Review comment: I think the flush operation indicates if this reader has both unfinished `BufferConsumer` and credits, we still want to transport this buffer to reduce latency. So the conditions of available reader should cover both available buffers and available credits. But the current conditions only confirm the reader has available credits to insert into queue. When we poll this reader from the queue and get next buffer to find null, it seems not make sense to register available reader here. So is it reasonable to adjust the conditions here to confirm this reader has both credits and unfinished buffers? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642854#comment-16642854 ] ASF GitHub Bot commented on FLINK-10135: yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-428072796 @zentol Thanks for your suggestion, I have updated the implementation, please review it again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-428072796 @zentol Thanks for your suggestion, I have updated the implementation, please review it again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10126) There should be a Scala DataSource
[ https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642852#comment-16642852 ] ASF GitHub Bot commented on FLINK-10126: yanghua commented on a change in pull request #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#discussion_r223569915 ## File path: pom.xml ## @@ -1613,6 +1613,7 @@ under the License. org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat + org.apache.flink.api.scala.ExecutionEnvironment Review comment: I temporarily added it to the exclusion list so that Travis can continue compiling and verifying other changes. cc @tillrohrmann and @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > There should be a Scala DataSource > -- > > Key: FLINK-10126 > URL: https://issues.apache.org/jira/browse/FLINK-10126 > Project: Flink > Issue Type: Improvement >Reporter: Alexis Sarda-Espinosa >Assignee: vinoyang >Priority: Minor > Labels: datasource, pull-request-available, scala > > In Java, an ExecutionEnvironment's createInput method returns a DataSource, > whereas the Scala version returns a DataSet. There is no Scala DataSource > wrapper, and the Scala DataSet does not provide the Java DataSource methods, > such as getSplitDataProperties. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6738: [FLINK-10126] There should be a Scala DataSource
yanghua commented on a change in pull request #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#discussion_r223569915 ## File path: pom.xml ## @@ -1613,6 +1613,7 @@ under the License. org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat + org.apache.flink.api.scala.ExecutionEnvironment Review comment: I temporarily added it to the exclusion list so that Travis can continue compiling and verifying other changes. cc @tillrohrmann and @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api
[ https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642850#comment-16642850 ] ASF GitHub Bot commented on FLINK-9682: --- yanghua commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api URL: https://github.com/apache/flink/pull/6266#discussion_r223569543 ## File path: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ## @@ -823,12 +823,29 @@ public JobExecutionResult execute() throws Exception { * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. * -* The program execution will be logged and displayed with the given job name. +* The program execution will be logged and displayed with the given job name and a generated +* default description. * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */ - public abstract JobExecutionResult execute(String jobName) throws Exception; + public JobExecutionResult execute(String jobName) throws Exception { + return execute(jobName, ""); + } + + /** +* Triggers the program execution. The environment will execute all parts of the program that have +* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()}, +* writing results (e.g. {@link DataSet#writeAsText(String)}, +* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic +* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. +* +* The program execution will be logged and displayed with the given job name and description. +* +* @return The result of the job execution, containing elapsed time and accumulators. +* @throws Exception Thrown, if the program executions fails. +*/ + public abstract JobExecutionResult execute(String jobName, String jobDescription) throws Exception; Review comment: Solved some compatibility issues, and now only this fundamental problem remains. cc @dawidwys and @tillrohrmann @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add setDescription to execution environment and provide description field for > the rest api > -- > > Key: FLINK-9682 > URL: https://issues.apache.org/jira/browse/FLINK-9682 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Webfrontend >Affects Versions: 1.5.0 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently you can provide a job name to {{execute}} in the execution > environment. In an environment where many version of a job may be executing, > such as a development or test environment, identifying which running job is > of a specific version via the UI can be difficult unless the version is > embedded into the job name given the {{execute}}. But the job name is uses > for other purposes, such as for namespacing metrics. Thus, it is not ideal > to modify the job name, as that could require modifying metric dashboards and > monitors each time versions change. > I propose a new method be added to the execution environment, > {{setDescription}}, that would allow a user to pass in an arbitrary > description that would be displayed in the dashboard, allowing users to > distinguish jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api
yanghua commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api URL: https://github.com/apache/flink/pull/6266#discussion_r223569543 ## File path: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ## @@ -823,12 +823,29 @@ public JobExecutionResult execute() throws Exception { * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. * -* The program execution will be logged and displayed with the given job name. +* The program execution will be logged and displayed with the given job name and a generated +* default description. * * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */ - public abstract JobExecutionResult execute(String jobName) throws Exception; + public JobExecutionResult execute(String jobName) throws Exception { + return execute(jobName, ""); + } + + /** +* Triggers the program execution. The environment will execute all parts of the program that have +* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()}, +* writing results (e.g. {@link DataSet#writeAsText(String)}, +* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic +* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. +* +* The program execution will be logged and displayed with the given job name and description. +* +* @return The result of the job execution, containing elapsed time and accumulators. +* @throws Exception Thrown, if the program executions fails. +*/ + public abstract JobExecutionResult execute(String jobName, String jobDescription) throws Exception; Review comment: Solved some compatibility issues, and now only this fundamental problem remains. cc @dawidwys and @tillrohrmann @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10514) change Tachyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642848#comment-16642848 ] vinoyang commented on FLINK-10514: -- hi [~yangxiaoshuo] , Usually you need to send an email to the dev mailing list to request JIRA contribution permission. I will ping [~till.rohrmann] and [~Zentol] to give you JIRA permission, if you get it you can claim the issue yourself. > change Tachyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Assignee: Guibo Pan >Priority: Minor > Labels: pull-request-available > > Since the *Tachyon* renamed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642790#comment-16642790 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223557953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); + private final Map> scheduledEvents = new HashMap<>(); + + public void register(ChannelHandlerContext ctx, long flushTimeout, NetworkSequenceViewReader reader) { Review comment: remove `public` for all the methods in this inner class? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223557953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); + private final Map> scheduledEvents = new HashMap<>(); + + public void register(ChannelHandlerContext ctx, long flushTimeout, NetworkSequenceViewReader reader) { Review comment: remove `public` for all the methods in this inner class? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642779#comment-16642779 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223556150 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { Review comment: We should clear the related maps or cancel the future in `releaseAllResources` caused by `exceptionCaught` or `channelInactive`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223556150 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { Review comment: We should clear the related maps or cancel the future in `releaseAllResources` caused by `exceptionCaught` or `channelInactive`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10472) Add CBRT math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642767#comment-16642767 ] ASF GitHub Bot commented on FLINK-10472: Guibo-Pan commented on issue #6783: [FLINK-10472] [table] Add CBRT math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6783#issuecomment-428051714 @aai95 From my side, I wonder if we are going on to add "the fourth root", "the fifth root" and so on, or add a standard "the nth root" function. In that case, I think consistency should take into consideration. It's just an opinion, feel free to remain this or discuss more if you think not. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add CBRT math function supported in Table API and SQL > - > > Key: FLINK-10472 > URL: https://issues.apache.org/jira/browse/FLINK-10472 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > > Implement the function to calculate the cube root. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6783: [FLINK-10472] [table] Add CBRT math function supported in Table API and SQL
Guibo-Pan commented on issue #6783: [FLINK-10472] [table] Add CBRT math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6783#issuecomment-428051714 @aai95 From my side, I wonder if we are going on to add "the fourth root", "the fifth root" and so on, or add a standard "the nth root" function. In that case, I think consistency should take into consideration. It's just an opinion, feel free to remain this or discuss more if you think not. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642757#comment-16642757 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223553034 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); + private final Map> scheduledEvents = new HashMap<>(); + + public void register(ChannelHandlerContext ctx, long flushTimeout, NetworkSequenceViewReader reader) { + checkState(!flushTimeouts.containsKey(reader)); + checkState(flushTimeout > 0); Review comment: If we check the `flushTimeout` at the beginning in `ResultPartition` stack as mentioned above, we need not care about it in the following processes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223553034 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + private static class RegisterPeriodicFlushEvent { + private final NetworkSequenceViewReader reader; + private final long flushTimeout; + + public RegisterPeriodicFlushEvent(NetworkSequenceViewReader reader, long flushTimeout) { + this.reader = checkNotNull(reader); + this.flushTimeout = flushTimeout; + } + + public NetworkSequenceViewReader getReader() { + return reader; + } + + public long getFlushTimeout() { + return flushTimeout; + } + } + + private static class PeriodicFlushes { + private final Map> periodicFlushes = new HashMap<>(); + private final Map flushTimeouts = new HashMap<>(); + private final Map> scheduledEvents = new HashMap<>(); + + public void register(ChannelHandlerContext ctx, long flushTimeout, NetworkSequenceViewReader reader) { + checkState(!flushTimeouts.containsKey(reader)); + checkState(flushTimeout > 0); Review comment: If we check the `flushTimeout` at the beginning in `ResultPartition` stack as mentioned above, we need not care about it in the following processes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642749#comment-16642749 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223552250 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -97,6 +107,18 @@ public void run() { }); } + void registerPeriodicFlush(NetworkSequenceViewReader reader, long flushTimeout) { + if (flushTimeout == 0) { Review comment: This condition check can be done earlier in `ResultPartition` or `ResultSubpartition` stack? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223552250 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -97,6 +107,18 @@ public void run() { }); } + void registerPeriodicFlush(NetworkSequenceViewReader reader, long flushTimeout) { + if (flushTimeout == 0) { Review comment: This condition check can be done earlier in `ResultPartition` or `ResultSubpartition` stack? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering
[ https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642744#comment-16642744 ] Biao Liu commented on FLINK-9788: - After checking the log file, I believe this is a critical bug. If the scenario happens below, the job would never recover as the log shows. 1. A failover happens due to lost of TM, ExecutionGraph tries to restart itself 2. The restarter(1) is in ExecutionGraph.restart(), resetting all executions 3. Another fatal error happens, it triggers ExecutionGraph.failGlobal, the state of ExecutionGraph is RESTARTING, it would increase the global version and try to restart ExecutionGraph too 4. The restarter(1) would fail, due to the global version is mismatched, some executions are resetted to CREATED, some executions are not 5. As the restarter(1) is failed, it would trigger another failGlobal without changing the state of ExecutionGraph (RESTARTING) 6. Some executions resetted in step 4 would fail forever in restarting I believe the problem is that the state RESTARTING of ExecutionGraph is not a safe state that we can do the restarting without any cancelation. Maybe we should do a cancelation while the state is RESTARTING. > ExecutionGraph Inconsistency prevents Job from recovering > - > > Key: FLINK-9788 > URL: https://issues.apache.org/jira/browse/FLINK-9788 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.0 > Environment: Rev: 4a06160 > Hadoop 2.8.3 >Reporter: Gary Yao >Priority: Critical > Fix For: 1.7.0, 1.6.2 > > Attachments: jobmanager_5000.log > > > Deployment mode: YARN job mode with HA > After killing many TaskManagers in succession, the state of the > ExecutionGraph ran into an inconsistent state, which prevented job recovery. > The following stacktrace was logged in the JobManager log several hundred > times per second: > {noformat} > -08 16:47:18,855 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph >- Job General purpose test job (37a794195840700b98feb23e99f7ea24) > switched from state RESTARTING to RESTARTING. > 2018-07-08 16:47:18,856 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting > the job General purpose test job (37a794195840700b98feb23e99f7ea24). > 2018-07-08 16:47:18,857 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph- Resetting > execution vertex Source: Custom Source -> Timestamps/Watermarks (1/10) for > new execution. > 2018-07-08 16:47:18,857 WARN > org.apache.flink.runtime.executiongraph.ExecutionGraph- Failed to > restart the job. > java.lang.IllegalStateException: Cannot reset a vertex that is in > non-terminal state CREATED > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.resetForNewExecution(ExecutionVertex.java:610) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:573) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1251) > at > org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59) > at > org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > The resulting jobmanager log file was 4.7 GB in size. Find attached the first > 5000 lines of the log file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642745#comment-16642745 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223550777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { Review comment: 1. The behavior of this method may be simple like `notifyDataAvailable`? So we can reduce `synchronized` part. ``` if (readView != null) { readView. registerPeriodicFlush(flushTimeout); } ``` 2. This implementation is same in `SpillableSubpartition#registerPeriodicFlush`, how about implementing this method in the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change is also defining protected `readView` in `ResultSubpartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223550777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { Review comment: 1. The behavior of this method may be simple like `notifyDataAvailable`? So we can reduce `synchronized` part. ``` if (readView != null) { readView. registerPeriodicFlush(flushTimeout); } ``` 2. This implementation is same in `SpillableSubpartition#registerPeriodicFlush`, how about implementing this method in the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change is also defining protected `readView` in `ResultSubpartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642742#comment-16642742 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223551011 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -106,11 +109,32 @@ protected Throwable getFailureCause() { abstract public void flush(); + /** +* Remote subpartitions support automatic periodic flush. This is the method to register it. +* Can only by used after {@link #isLocal()} state is known. Review comment: by used -> be used? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223551011 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -106,11 +109,32 @@ protected Throwable getFailureCause() { abstract public void flush(); + /** +* Remote subpartitions support automatic periodic flush. This is the method to register it. +* Can only by used after {@link #isLocal()} state is known. Review comment: by used -> be used? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642741#comment-16642741 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223550777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { Review comment: 1. The behavior of this method may be simple like `notifyDataAvailable`? So we can reduce `synchronized` part. ``` if (readView != null) { readView. registerPeriodicFlush(flushTimeout); } ``` 2. This implementation is same in `SpillableSubpartition#registerPeriodicFlush`, maybe we can put this method in the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change is also defining protected `readView` in `ResultSubpartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223550777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -77,6 +77,14 @@ public void flush() { } } + @Override + public void registerPeriodicFlush(long flushTimeout) { Review comment: 1. The behavior of this method may be simple like `notifyDataAvailable`? So we can reduce `synchronized` part. ``` if (readView != null) { readView. registerPeriodicFlush(flushTimeout); } ``` 2. This implementation is same in `SpillableSubpartition#registerPeriodicFlush`, maybe we can put this method in the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change is also defining protected `readView` in `ResultSubpartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10514) change Tachyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642719#comment-16642719 ] yangxiaoshuo commented on FLINK-10514: -- Thanks! And I wonder how to assign an issue to me?[~guibopan] Could you give me some advice? > change Tachyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Assignee: Guibo Pan >Priority: Minor > Labels: pull-request-available > > Since the *Tachyon* renamed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8581) Improve performance for low latency network
[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642714#comment-16642714 ] ASF GitHub Bot commented on FLINK-8581: --- zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223546433 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. Review comment: it's ->its This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve performance for low latency network > --- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty URL: https://github.com/apache/flink/pull/6698#discussion_r223546433 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ## @@ -94,6 +96,19 @@ /** The subpartitions of this partition. At least one. */ private final ResultSubpartition[] subpartitions; + /** +* Subset of {@code subpartitions} that are definitely local. We can only determine whether a +* subpartition is local or not once it's read view was created. Review comment: it's ->its This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10514) change Tachyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yangxiaoshuo updated FLINK-10514: - Description: Since the *Tachyon* renamed to *Alluxio,* we should change doc as well. (was: Since the *Tachyon* renameed to *Alluxio,* we should change doc as well.) Summary: change Tachyon to Alluxio (was: change Tychyon to Alluxio) > change Tachyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Assignee: Guibo Pan >Priority: Minor > Labels: pull-request-available > > Since the *Tachyon* renamed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642687#comment-16642687 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223541560 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: Well, I first take the logic of checking the BOM in FileInputFormat to DelimitedInputFormat. I want to use a Map to cache the BOM encoding of the file, using the filename as the key and the BOM encoding as the value. If the value exists in the Map, the corresponding value is read, and if the Map does not exist, the BOM encoding of the file is read. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed
XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223541560 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: Well, I first take the logic of checking the BOM in FileInputFormat to DelimitedInputFormat. I want to use a Map to cache the BOM encoding of the file, using the filename as the key and the BOM encoding as the value. If the value exists in the Map, the corresponding value is read, and if the Map does not exist, the BOM encoding of the file is read. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10404) Declarative resource management
[ https://issues.apache.org/jira/browse/FLINK-10404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642640#comment-16642640 ] JIN SUN commented on FLINK-10404: - Glad to see the design document, will take a look and comment. > Declarative resource management > --- > > Key: FLINK-10404 > URL: https://issues.apache.org/jira/browse/FLINK-10404 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > This is the umbrella issue to track the progress on implementing declarative > resource management. > Declarative resource management is a change to Flink's current slot > allocation protocol. Instead of letting the {{JobMaster}} ask for each slot > individually, it will tell the {{ResourceManager}} its current need (min, > target, max) of slots. Based on that, the {{ResourceManager}} will assign > available slots or start new {{TaskExecutors}} to fulfill the request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10498) Decouple ExecutionGraph from JobMaster
[ https://issues.apache.org/jira/browse/FLINK-10498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642635#comment-16642635 ] JIN SUN commented on FLINK-10498: - this seems related to https://issues.apache.org/jira/browse/FLINK-10429 ? > Decouple ExecutionGraph from JobMaster > -- > > Key: FLINK-10498 > URL: https://issues.apache.org/jira/browse/FLINK-10498 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > With declarative resource management we want to react to the set of available > resources. Thus, we need a component which is responsible for scaling the > {{ExecutionGraph}} accordingly. In order to better do this and separate > concerns, it is beneficial to introduce a {{Scheduler/ExecutionGraphDriver}} > component which is in charge of the {{ExecutionGraph}}. This component owns > the {{ExecutionGraph}} and is allowed to modify it. In the first version, > this component will simply accommodate all the existing logic of the > {{JobMaster}} and the respective {{JobMaster}} methods are forwarded to this > component. > This new component should not change the existing behaviour of Flink. > Later this component will be in charge of announcing the required resources, > deciding when to rescale and executing the rescaling operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream
[ https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642359#comment-16642359 ] ASF GitHub Bot commented on FLINK-10380: satybald commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method URL: https://github.com/apache/flink/pull/6724#issuecomment-427952541 @StephanEwen thank you for feedback! From what I see, `KeyGroupRangeAssignment#assignToKeyGroup` is utility class. It has 36 usages in different classes: state backends, operators, and tests. `AbstractKeyedStateBackend#setCurrentKey`, which uses the utilty method has 296 usages. I'm bit afraid that catching and testing NPE earlier in the chain requires a significant effort to rewrite everything without much benefit. As try/catch doesn't make code cleaner and adding the same error message will violate DRY principle. Also, determine if the key is null earlier in the chain without preconditions with try/catch might lead to false positives as there's another reason for child method throwing NPE. From what I see `computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);` is the place where key group index calcluated and NPE is thrown in case key is null. Would you agree that catching NPE in utility class and overriding with a specific message is viable alternative? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Check if key is not nul before assign to group in KeyedStream > - > > Key: FLINK-10380 > URL: https://issues.apache.org/jira/browse/FLINK-10380 > Project: Flink > Issue Type: Task >Affects Versions: 1.6.0 >Reporter: Sayat Satybaldiyev >Priority: Minor > Labels: pull-request-available > > If a user creates a KeyedStream and partition by key which might be null, > Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink > throws hard to debug and understand as it doesn't refer to place in Flink job. > *Suggestion:* > Add precondition that checks if the key is not null and throw a descriptive > error if it's a null. > > *Job Example*: > > {code:java} > DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb")) > .map(x -> (String)null) > .keyBy(x -> x);{code} > > > An error that is thrown: > > {code:java} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > java.lang.RuntimeException > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61) > Caused by: java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110 > INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC > service. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59) > at > org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at > org.apache.flink.runtime.io.network.api.wr
[GitHub] satybald commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method
satybald commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method URL: https://github.com/apache/flink/pull/6724#issuecomment-427952541 @StephanEwen thank you for feedback! From what I see, `KeyGroupRangeAssignment#assignToKeyGroup` is utility class. It has 36 usages in different classes: state backends, operators, and tests. `AbstractKeyedStateBackend#setCurrentKey`, which uses the utilty method has 296 usages. I'm bit afraid that catching and testing NPE earlier in the chain requires a significant effort to rewrite everything without much benefit. As try/catch doesn't make code cleaner and adding the same error message will violate DRY principle. Also, determine if the key is null earlier in the chain without preconditions with try/catch might lead to false positives as there's another reason for child method throwing NPE. From what I see `computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);` is the place where key group index calcluated and NPE is thrown in case key is null. Would you agree that catching NPE in utility class and overriding with a specific message is viable alternative? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642336#comment-16642336 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223465173 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -909,6 +926,70 @@ private void clearDispatcherState() { terminateJobManagerRunners(); } + private void instantiateJobManagerOverviewMetrics(MetricGroup jobManagerMetricGroup) { + long defaultTimeoutValue = configuration.getLong(RestOptions.CONNECTION_TIMEOUT); + Time defaultRequestTimeout = Time.milliseconds(defaultTimeoutValue); + + jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> { + try { + return (long) resourceManagerGateway Review comment: This method results in 3 separate RPC calls being made every time metrics are being requested. If the dispatcher cannot reasonably provide this information then they should be exposed at the resource manager instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223468454 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testJobManagerMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next(); + + List expectedPatterns = getExpectedPatterns(); + + Collection gaugeNames = reporter.getGauges().values(); + + for (String expectedPattern : expectedPatterns) { + boolean
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642338#comment-16642338 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223468630 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testJobManagerMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTE
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642339#comment-16642339 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223468454 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testJobManagerMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTE
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642333#comment-16642333 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223467705 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); Review comment: You could simplify this section by simple defining a job that has a blocking UDF and executing it in a separate thread. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL:
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642335#comment-16642335 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223466964 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -909,6 +926,70 @@ private void clearDispatcherState() { terminateJobManagerRunners(); } + private void instantiateJobManagerOverviewMetrics(MetricGroup jobManagerMetricGroup) { + long defaultTimeoutValue = configuration.getLong(RestOptions.CONNECTION_TIMEOUT); + Time defaultRequestTimeout = Time.milliseconds(defaultTimeoutValue); + + jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> { + try { + return (long) resourceManagerGateway + .requestResourceOverview(defaultRequestTimeout) + .get() + .getNumberFreeSlots(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Request resource overview occurs exception.", e); + } + + return 0L; + }); + + jobManagerMetricGroup.gauge("taskSlotsTotal", () -> { + try { + return (long) resourceManagerGateway + .requestResourceOverview(defaultRequestTimeout) + .get() + .getNumberRegisteredSlots(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Request resource overview occurs exception.", e); + } + + return 0L; + }); + + jobManagerMetricGroup.gauge("numRegisteredTaskManagers", () -> { + try { + return (long) resourceManagerGateway + .requestResourceOverview(defaultRequestTimeout) + .get() + .getNumberTaskManagers(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Request resource overview occurs exception.", e); + } + + return 0L; + }); + + jobManagerMetricGroup.gauge("numRunningJobs", () -> { + try { + return (long) requestOverviewForAllJobs(defaultRequestTimeout) Review comment: This results in a wave of RPC calls to every single JM. Would it not be feasible to simple count the number of job IDs contained in `jobManagerRunnerFutures`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223465173 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -909,6 +926,70 @@ private void clearDispatcherState() { terminateJobManagerRunners(); } + private void instantiateJobManagerOverviewMetrics(MetricGroup jobManagerMetricGroup) { + long defaultTimeoutValue = configuration.getLong(RestOptions.CONNECTION_TIMEOUT); + Time defaultRequestTimeout = Time.milliseconds(defaultTimeoutValue); + + jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> { + try { + return (long) resourceManagerGateway Review comment: This method results in 3 separate RPC calls being made every time metrics are being requested. If the dispatcher cannot reasonably provide this information then they should be exposed at the resource manager instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223469702 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testJobManagerMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next(); + + List expectedPatterns = getExpectedPatterns(); + + Collection gaugeNames = reporter.getGauges().values(); + + for (String expectedPattern : expectedPatterns) { + boolean
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223466071 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -909,6 +926,70 @@ private void clearDispatcherState() { terminateJobManagerRunners(); } + private void instantiateJobManagerOverviewMetrics(MetricGroup jobManagerMetricGroup) { + long defaultTimeoutValue = configuration.getLong(RestOptions.CONNECTION_TIMEOUT); Review comment: This is not an appropriate timeout as this is completely unrelated to the REST API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642334#comment-16642334 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223469702 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testJobManagerMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTE
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223467705 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); Review comment: You could simplify this section by simple defining a job that has a blocking UDF and executing it in a separate thread. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223466964 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -909,6 +926,70 @@ private void clearDispatcherState() { terminateJobManagerRunners(); } + private void instantiateJobManagerOverviewMetrics(MetricGroup jobManagerMetricGroup) { + long defaultTimeoutValue = configuration.getLong(RestOptions.CONNECTION_TIMEOUT); + Time defaultRequestTimeout = Time.milliseconds(defaultTimeoutValue); + + jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> { + try { + return (long) resourceManagerGateway + .requestResourceOverview(defaultRequestTimeout) + .get() + .getNumberFreeSlots(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Request resource overview occurs exception.", e); + } + + return 0L; + }); + + jobManagerMetricGroup.gauge("taskSlotsTotal", () -> { + try { + return (long) resourceManagerGateway + .requestResourceOverview(defaultRequestTimeout) + .get() + .getNumberRegisteredSlots(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Request resource overview occurs exception.", e); + } + + return 0L; + }); + + jobManagerMetricGroup.gauge("numRegisteredTaskManagers", () -> { + try { + return (long) resourceManagerGateway + .requestResourceOverview(defaultRequestTimeout) + .get() + .getNumberTaskManagers(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Request resource overview occurs exception.", e); + } + + return 0L; + }); + + jobManagerMetricGroup.gauge("numRunningJobs", () -> { + try { + return (long) requestOverviewForAllJobs(defaultRequestTimeout) Review comment: This results in a wave of RPC calls to every single JM. Would it not be feasible to simple count the number of job IDs contained in `jobManagerRunnerFutures`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223468630 ## File path: flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for proper initialization of the job manager metrics. + */ +public class JobManagerMetricsITCase { + + private static final String JOB_MANAGER_METRICS_PREFIX = "localhost.jobmanager."; + private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable"; + private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal"; + private static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers"; + private static final String NUM_RUNNING_JOBS = "numRunningJobs"; + + private static CountDownLatch invokeLatch; + + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + MINI_CLUSTER_RESOURCE.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) MINI_CLUSTER_RESOURCE.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); + invokeLatch.await(60, TimeUnit.SECONDS); + waitForJob(); + } + + @Test + public void testJobManagerMetrics() throws Exception { + assertEquals(1, TestReporter.OPENED_REPORTERS.size()); + TestReporter reporter = TestReporter.OPENED_REPORTERS.iterator().next(); + + List expectedPatterns = getExpectedPatterns(); + + Collection gaugeNames = reporter.getGauges().values(); + + for (String expectedPattern : expectedPatterns) { + boolean
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642337#comment-16642337 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r223466071 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -909,6 +926,70 @@ private void clearDispatcherState() { terminateJobManagerRunners(); } + private void instantiateJobManagerOverviewMetrics(MetricGroup jobManagerMetricGroup) { + long defaultTimeoutValue = configuration.getLong(RestOptions.CONNECTION_TIMEOUT); Review comment: This is not an appropriate timeout as this is completely unrelated to the REST API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642185#comment-16642185 ] Andrey Zagrebin commented on FLINK-10295: - The pull request adds another way to provide program args as list of string. The most secure option is to pass program args as a json list in the body of the http request. This way is cross-platform. The args string does not need any tokenisation because they are already tokenised in the list. The follow-up issue FLINK-10515 is to still improve the tokenisation to simplify passing args as string in web UI at least for unix style escaping. > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10515) Improve tokenisation of program args passed as string
Andrey Zagrebin created FLINK-10515: --- Summary: Improve tokenisation of program args passed as string Key: FLINK-10515 URL: https://issues.apache.org/jira/browse/FLINK-10515 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.7.0 Reporter: Andrey Zagrebin Assignee: Andrey Zagrebin At the moment tokenisation of program args does not respect escape characters. It can be improved to support at least program args separated by spaces with unix style escaping. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10514) change Tychyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642147#comment-16642147 ] ASF GitHub Bot commented on FLINK-10514: Guibo-Pan opened a new pull request #6806: [FLINK-10514][docs] change tachyon to alluxio in doc URL: https://github.com/apache/flink/pull/6806 ## What is the purpose of the change *Since Tachyon was renamed to Alluxio, replace the remaining Tachyon to Alluxio in docs.* This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > change Tychyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Assignee: Guibo Pan >Priority: Minor > Labels: pull-request-available > > Since the *Tachyon* renameed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10514) change Tychyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10514: --- Labels: pull-request-available (was: ) > change Tychyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Assignee: Guibo Pan >Priority: Minor > Labels: pull-request-available > > Since the *Tachyon* renameed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan opened a new pull request #6806: [FLINK-10514][docs] change tachyon to alluxio in doc
Guibo-Pan opened a new pull request #6806: [FLINK-10514][docs] change tachyon to alluxio in doc URL: https://github.com/apache/flink/pull/6806 ## What is the purpose of the change *Since Tachyon was renamed to Alluxio, replace the remaining Tachyon to Alluxio in docs.* This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10514) change Tychyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642120#comment-16642120 ] Guibo Pan commented on FLINK-10514: --- It seems that the work was almost done in [FLINK-4185|https://issues.apache.org/jira/browse/FLINK-4185], while just a little left. I will go on to fulfill it. > change Tychyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Priority: Minor > > Since the *Tachyon* renameed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10514) change Tychyon to Alluxio
[ https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guibo Pan reassigned FLINK-10514: - Assignee: Guibo Pan > change Tychyon to Alluxio > - > > Key: FLINK-10514 > URL: https://issues.apache.org/jira/browse/FLINK-10514 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: yangxiaoshuo >Assignee: Guibo Pan >Priority: Minor > > Since the *Tachyon* renameed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642090#comment-16642090 ] ASF GitHub Bot commented on FLINK-10340: yanghua commented on issue #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#issuecomment-427896213 It's strange. It seems the Travis is in progress. But in detail page, it shows green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Cosh math function supported in Table API and SQL > - > > Key: FLINK-10340 > URL: https://issues.apache.org/jira/browse/FLINK-10340 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Sergey Tsvetkov >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Implement udf of cosh, just like in oracle > [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL
yanghua commented on issue #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#issuecomment-427896213 It's strange. It seems the Travis is in progress. But in detail page, it shows green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642082#comment-16642082 ] ASF GitHub Bot commented on FLINK-10135: yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-427895003 @zentol I have changed the test case, please review it again, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-427895003 @zentol I have changed the test case, please review it again, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10126) There should be a Scala DataSource
[ https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642080#comment-16642080 ] ASF GitHub Bot commented on FLINK-10126: yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#issuecomment-427894706 Hi @tillrohrmann I have updated this PR based on your suggestion, however the `japicmp-maven-plugin` reported incompatibility issue `METHOD_RETURN_TYPE_CHANGED `. It seems the method's signature would been verified. What's your idea? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > There should be a Scala DataSource > -- > > Key: FLINK-10126 > URL: https://issues.apache.org/jira/browse/FLINK-10126 > Project: Flink > Issue Type: Improvement >Reporter: Alexis Sarda-Espinosa >Assignee: vinoyang >Priority: Minor > Labels: datasource, pull-request-available, scala > > In Java, an ExecutionEnvironment's createInput method returns a DataSource, > whereas the Scala version returns a DataSet. There is no Scala DataSource > wrapper, and the Scala DataSet does not provide the Java DataSource methods, > such as getSplitDataProperties. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource
yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#issuecomment-427894706 Hi @tillrohrmann I have updated this PR based on your suggestion, however the `japicmp-maven-plugin` reported incompatibility issue `METHOD_RETURN_TYPE_CHANGED `. It seems the method's signature would been verified. What's your idea? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642059#comment-16642059 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223416786 ## File path: flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java ## @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) { } catch (Exception e) { fail(); Review comment: I refactor it as ```java try { byte[] b = InstantiationUtil.serializeObject(parameter); final ParameterTool copy = InstantiationUtil.deserializeObject(b, getClass().getClassLoader()); internalValidate(copy); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } ``` this keeps the root cause and save us from modify signature of methods calling `AbstractParameterToolTest#validate`. If you prefer to propagate it, I am not opposite to it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor ParameterTool#fromArgs > --- > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223416786 ## File path: flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java ## @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) { } catch (Exception e) { fail(); Review comment: I refactor it as ```java try { byte[] b = InstantiationUtil.serializeObject(parameter); final ParameterTool copy = InstantiationUtil.deserializeObject(b, getClass().getClassLoader()); internalValidate(copy); } catch (Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); } ``` this keeps the root cause and save us from modify signature of methods calling `AbstractParameterToolTest#validate`. If you prefer to propagate it, I am not opposite to it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642053#comment-16642053 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223415739 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223415739 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalA
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642049#comment-16642049 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223415460 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; Review comment: It is up to you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor ParameterTool#fromArgs > --- > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r22341 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalA
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642051#comment-16642051 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r22341 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223415460 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; Review comment: It is up to you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642043#comment-16642043 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on issue #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-427886511 @GJL thanks for your review and detailed comments! I have pushed a commit to address most of them. It is midnight in my timezone so I would be offline for hours. I am glad to see any beneficial modification on merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor ParameterTool#fromArgs > --- > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
TisonKun commented on issue #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-427886511 @GJL thanks for your review and detailed comments! I have pushed a commit to address most of them. It is midnight in my timezone so I would be offline for hours. I am glad to see any beneficial modification on merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642021#comment-16642021 ] ASF GitHub Bot commented on FLINK-10247: Clark commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system URL: https://github.com/apache/flink/pull/6759#issuecomment-427880375 Well, that makes sense @tillrohrmann. And maybe we will rename the actor system member variable if other component rely on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Clarkkkkk commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system
Clark commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system URL: https://github.com/apache/flink/pull/6759#issuecomment-427880375 Well, that makes sense @tillrohrmann. And maybe we will rename the actor system member variable if other component rely on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity
[ https://issues.apache.org/jira/browse/FLINK-5542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641990#comment-16641990 ] ASF GitHub Bot commented on FLINK-5542: --- GJL commented on issue #6775: [FLINK-5542] use YarnCluster vcores setting to do MaxVCore validation URL: https://github.com/apache/flink/pull/6775#issuecomment-427871290 LGTM, merging as soon as build is green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YARN client incorrectly uses local YARN config to check vcore capacity > -- > > Key: FLINK-5542 > URL: https://issues.apache.org/jira/browse/FLINK-5542 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.1.4, 1.5.3, 1.6.0, 1.7.0 >Reporter: Shannon Carey >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > See > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html > When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in > 1.1.4 is comparing the user's selected number of vcores to the vcores > configured in the local node's YARN config (from YarnConfiguration eg. > yarn-site.xml and yarn-default.xml). It incorrectly prevents Flink from > launching even if there is sufficient vcore capacity on the cluster. > That is not correct, because the application will not necessarily run on the > local node. For example, if running the yarn-session.sh client from the AWS > EMR master node, the vcore count there may be different from the vcore count > on the core nodes where Flink will actually run. > A reasonable way to fix this would probably be to reuse the logic from > "yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get > vcore information from the real worker nodes. Alternatively, perhaps we > could remove the check entirely and rely on YARN's Scheduler to determine > whether sufficient resources exist. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on issue #6775: [FLINK-5542] use YarnCluster vcores setting to do MaxVCore validation
GJL commented on issue #6775: [FLINK-5542] use YarnCluster vcores setting to do MaxVCore validation URL: https://github.com/apache/flink/pull/6775#issuecomment-427871290 LGTM, merging as soon as build is green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641964#comment-16641964 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641962#comment-16641962 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalArgume
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641960#comment-16641960 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalArgume
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalArgume
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641961#comment-16641961 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalArgume
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641956#comment-16641956 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); -
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641957#comment-16641957 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223311299 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; Review comment: There are no tests asserting that `Error parsing arguments [...]` is part of the error message. Imo the test `testIllegalArgs` should be rewritten as follows: ``` @Test public void testThrowExceptionIfParameterIsNotPrefixed() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Error parsing arguments '[a]' on 'a'. Please prefix keys with -- or -."); ParameterTool.fromArgs(new String[]{"a"}); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor ParameterTool#fromArgs > --- > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sen
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641958#comment-16641958 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223348499 ## File path: flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java ## @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) { } catch (Exception e) { fail(); Review comment: Since you are already cleaning up: The test will fail if an exception is propagated. There is no need to to catch it and fail explicitly. Also we are currently masking the root cause. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor ParameterTool#fromArgs > --- > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641955#comment-16641955 ] ASF GitHub Bot commented on FLINK-10399: GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223394447 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; Review comment: Optional: The function could benefit from _extract method_ refactoring: ``` final String key = getNextKey(args, i); ``` ``` private static String getNextKey(final String[] args, final int index) { final String arg = args[index]; final String key; if (arg.startsWith("--")) { key = arg.substring(2); } else if (arg.startsWith("-")) { key = arg.substring(1); } else { throw new IllegalArgumentException( String.format("Error parsing arguments '%s' on '%s'." + " Please prefix keys with -- or -.", Arrays.toString(args), arg)); } if (key.length() == 0) { throw new IllegalArgumentException( String.format("The input %s contains an empty argument", Arrays.toString(args))); } return key; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor ParameterTool#fromArgs > --- > >
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223394447 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; Review comment: Optional: The function could benefit from _extract method_ refactoring: ``` final String key = getNextKey(args, i); ``` ``` private static String getNextKey(final String[] args, final int index) { final String arg = args[index]; final String key; if (arg.startsWith("--")) { key = arg.substring(2); } else if (arg.startsWith("-")) { key = arg.substring(1); } else { throw new IllegalArgumentException( String.format("Error parsing arguments '%s' on '%s'." + " Please prefix keys with -- or -.", Arrays.toString(args), arg)); } if (key.length() == 0) { throw new IllegalArgumentException( String.format("The input %s contains an empty argument", Arrays.toString(args))); } return key; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223348499 ## File path: flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java ## @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) { } catch (Exception e) { fail(); Review comment: Since you are already cleaning up: The test will fail if an exception is propagated. There is no need to to catch it and fail explicitly. Also we are currently masking the root cause. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223391917 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; + + int i = 0; + while (i < args.length) { + final String key; + + if (args[i].startsWith("--")) { + key = args[i].substring(2); + } else if (args[i].startsWith("-")) { + key = args[i].substring(1); } else { - if (expectValue) { - value = arg; - expectValue = false; - } else { - throw new RuntimeException("Error parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected value. Please prefix values with -- or -."); - } + throw new IllegalArgumentException( + String.format(errorMessage + " Please prefix keys with -- or -.", + Arrays.toString(args), args[i])); } - if (value == null && key == null) { - throw new IllegalStateException("Value and key can not be null at the same time"); - } - if (key != null && value == null && !expectValue) { - throw new IllegalStateException("Value expected but flag not set"); - } - if (key != null && value != null) { - map.put(key, value); - key = null; - value = null; - expectValue = false; - } - if (key != null && key.length() == 0) { - throw new IllegalArgume
[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#discussion_r223311299 ## File path: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java ## @@ -68,81 +68,46 @@ * @return A {@link ParameterTool} */ public static ParameterTool fromArgs(String[] args) { - Map map = new HashMap(args.length / 2); - - String key = null; - String value = null; - boolean expectValue = false; - for (String arg : args) { - // check for -- argument - if (arg.startsWith("--")) { - if (expectValue) { - // we got into a new key, even though we were a value --> current key is one without value - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - map.put(key, NO_VALUE_KEY); - // key will be overwritten in the next step - } - key = arg.substring(2); - expectValue = true; - } // check for - argument - else if (arg.startsWith("-")) { - // we are waiting for a value, so this is a - prefixed value (negative number) - if (expectValue) { - - if (NumberUtils.isNumber(arg)) { - // negative number - value = arg; - expectValue = false; - } else { - if (value != null) { - throw new IllegalStateException("Unexpected state"); - } - // We waited for a value but found a new key. So the previous key doesnt have a value. - map.put(key, NO_VALUE_KEY); - key = arg.substring(1); - expectValue = true; - } - } else { - // we are not waiting for a value, so its an argument - key = arg.substring(1); - expectValue = true; - } + final Map map = new HashMap<>(args.length / 2); + + final String errorMessage = "Error parsing arguments '%s' on '%s'."; Review comment: There are no tests asserting that `Error parsing arguments [...]` is part of the error message. Imo the test `testIllegalArgs` should be rewritten as follows: ``` @Test public void testThrowExceptionIfParameterIsNotPrefixed() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Error parsing arguments '[a]' on 'a'. Please prefix keys with -- or -."); ParameterTool.fromArgs(new String[]{"a"}); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10514) change Tychyon to Alluxio
yangxiaoshuo created FLINK-10514: Summary: change Tychyon to Alluxio Key: FLINK-10514 URL: https://issues.apache.org/jira/browse/FLINK-10514 Project: Flink Issue Type: Improvement Components: Streaming Reporter: yangxiaoshuo Since the *Tachyon* renameed to *Alluxio,* we should change doc as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641859#comment-16641859 ] ASF GitHub Bot commented on FLINK-10156: fhueske opened a new pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805 ## What is the purpose of the change * Deprecate `Table.writeToSink()` as discussed in FLINK-10156 ## Brief change log * Deprecate `Table.writeToSink()` * Port usage of `writeToSink()` to `insertInto()` ## Verifying this change * run existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? Docs have been adjusted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10156: --- Labels: pull-request-available (was: ) > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske opened a new pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
fhueske opened a new pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805 ## What is the purpose of the change * Deprecate `Table.writeToSink()` as discussed in FLINK-10156 ## Brief change log * Deprecate `Table.writeToSink()` * Port usage of `writeToSink()` to `insertInto()` ## Verifying this change * run existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? Docs have been adjusted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services