[jira] [Commented] (FLINK-10474) Don't translate IN to JOIN with VALUES for streaming queries

2018-10-08 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642878#comment-16642878
 ] 

Piotr Nowojski commented on FLINK-10474:


Sorry for jumping a bit late to the discussion, but I would like to point out 
couple of drawbacks of the 1. approach:
 # it's less general. 2nd option would/could cover more cases like: IN queries 
with bounded table (not values), JOINS with bounded tables (or values). JOINS 
with bounded are something that is being asked by the users and is something 
that we would like to have. If we go now with 1. approach, it will be a wasted 
effort after implementing bounded JOINS. 
 # It adds complexity. Despite it being maybe easier to implement, it doesn't 
add new features to the Flink, while increasing code complexity by adding some 
code to handle only a special case.
 # It will complicate planning logic and will more diverge streaming plans from 
the batch. This again will rise the complexity of the project (more moving 
parts, more things one have to consider when analysing planning results).

> Don't translate IN to JOIN with VALUES for streaming queries
> 
>
> Key: FLINK-10474
> URL: https://issues.apache.org/jira/browse/FLINK-10474
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> IN clauses are translated to JOIN with VALUES if the number of elements in 
> the IN clause exceeds a certain threshold. This should not be done, because a 
> streaming join is very heavy and materializes both inputs (which is fine for 
> the VALUES) input but not for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is 
> bound and final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642875#comment-16642875
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223575788
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 ##
 @@ -115,8 +115,12 @@ public boolean isRegisteredAsAvailable() {
@Override
public boolean isAvailable() {
// BEWARE: this must be in sync with 
#isAvailable(BufferAndBacklog)!
-   return hasBuffersAvailable() &&
-   (numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
+   return hasBuffersAvailable() && !isBlocked();
+   }
+
+   @Override
+   public boolean isBlocked() {
 
 Review comment:
   I think the `isBlocked` naming can not indicate the specific semantics. How 
about changing to `isCreditsAvailable()` directly?
   
   To do so, the `isAvailable()` breaks into `buffersAvailable()` and 
`creditsAvailable()`, and the following private method 
`isAvailable(BufferAndBacklog bufferAndBacklog)` may also reuse 
`isCreditsAvailable()` .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223575788
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 ##
 @@ -115,8 +115,12 @@ public boolean isRegisteredAsAvailable() {
@Override
public boolean isAvailable() {
// BEWARE: this must be in sync with 
#isAvailable(BufferAndBacklog)!
-   return hasBuffersAvailable() &&
-   (numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
+   return hasBuffersAvailable() && !isBlocked();
+   }
+
+   @Override
+   public boolean isBlocked() {
 
 Review comment:
   I think the `isBlocked` naming can not indicate the specific semantics. How 
about changing to `isCreditsAvailable()` directly?
   
   To do so, the `isAvailable()` breaks into `buffersAvailable()` and 
`creditsAvailable()`, and the following private method 
`isAvailable(BufferAndBacklog bufferAndBacklog)` may also reuse 
`isCreditsAvailable()` .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642866#comment-16642866
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223574052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -119,6 +141,22 @@ private void enqueueAvailableReader(final 
NetworkSequenceViewReader reader) thro
}
}
 
+   private void flushReaders(long flushTimeout) throws Exception {
+   List readersToFlush = 
periodicFlushes.getReaders(flushTimeout);
+
+   boolean wasEmpty = availableReaders.isEmpty();
+
+   for (NetworkSequenceViewReader reader : readersToFlush) {
+   if (!reader.isRegisteredAsAvailable() && 
!reader.isBlocked()) {
 
 Review comment:
   I think the flush operation indicates if this reader has both unfinished 
`BufferConsumer` and credits, we still want to transport this buffer to reduce 
latency. So the conditions of available reader should cover both available 
buffers and available credits.  But the current conditions only confirm the 
reader has available credits to insert into queue. When we poll this reader 
from the queue and get next buffer to find null, it seems not make sense to 
register available reader here.
   
   So is it reasonable to adjust the conditions here to confirm this reader has 
both credits and unfinished buffers?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223574052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -119,6 +141,22 @@ private void enqueueAvailableReader(final 
NetworkSequenceViewReader reader) thro
}
}
 
+   private void flushReaders(long flushTimeout) throws Exception {
+   List readersToFlush = 
periodicFlushes.getReaders(flushTimeout);
+
+   boolean wasEmpty = availableReaders.isEmpty();
+
+   for (NetworkSequenceViewReader reader : readersToFlush) {
+   if (!reader.isRegisteredAsAvailable() && 
!reader.isBlocked()) {
 
 Review comment:
   I think the flush operation indicates if this reader has both unfinished 
`BufferConsumer` and credits, we still want to transport this buffer to reduce 
latency. So the conditions of available reader should cover both available 
buffers and available credits.  But the current conditions only confirm the 
reader has available credits to insert into queue. When we poll this reader 
from the queue and get next buffer to find null, it seems not make sense to 
register available reader here.
   
   So is it reasonable to adjust the conditions here to confirm this reader has 
both credits and unfinished buffers?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642854#comment-16642854
 ] 

ASF GitHub Bot commented on FLINK-10135:


yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-428072796
 
 
   @zentol Thanks for your suggestion, I have updated the implementation, 
please review it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-428072796
 
 
   @zentol Thanks for your suggestion, I have updated the implementation, 
please review it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10126) There should be a Scala DataSource

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642852#comment-16642852
 ] 

ASF GitHub Bot commented on FLINK-10126:


yanghua commented on a change in pull request #6738: [FLINK-10126] There should 
be a Scala DataSource
URL: https://github.com/apache/flink/pull/6738#discussion_r223569915
 
 

 ##
 File path: pom.xml
 ##
 @@ -1613,6 +1613,7 @@ under the License.

org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat

org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat

org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+   
org.apache.flink.api.scala.ExecutionEnvironment
 
 Review comment:
   I temporarily added it to the exclusion list so that Travis can continue 
compiling and verifying other changes. cc @tillrohrmann and @StephanEwen 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> There should be a Scala DataSource
> --
>
> Key: FLINK-10126
> URL: https://issues.apache.org/jira/browse/FLINK-10126
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexis Sarda-Espinosa
>Assignee: vinoyang
>Priority: Minor
>  Labels: datasource, pull-request-available, scala
>
> In Java, an ExecutionEnvironment's createInput method returns a DataSource, 
> whereas the Scala version returns a DataSet. There is no Scala DataSource 
> wrapper, and the Scala DataSet does not provide the Java DataSource methods, 
> such as getSplitDataProperties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6738: [FLINK-10126] There should be a Scala DataSource

2018-10-08 Thread GitBox
yanghua commented on a change in pull request #6738: [FLINK-10126] There should 
be a Scala DataSource
URL: https://github.com/apache/flink/pull/6738#discussion_r223569915
 
 

 ##
 File path: pom.xml
 ##
 @@ -1613,6 +1613,7 @@ under the License.

org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat

org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat

org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+   
org.apache.flink.api.scala.ExecutionEnvironment
 
 Review comment:
   I temporarily added it to the exclusion list so that Travis can continue 
compiling and verifying other changes. cc @tillrohrmann and @StephanEwen 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642850#comment-16642850
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

yanghua commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r223569543
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
 ##
 @@ -823,12 +823,29 @@ public JobExecutionResult execute() throws Exception {
 * {@link 
DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or 
other generic
 * data sinks created with {@link 
DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
 *
-* The program execution will be logged and displayed with the given 
job name.
+* The program execution will be logged and displayed with the given 
job name and a generated
+* default description.
 *
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 * @throws Exception Thrown, if the program executions fails.
 */
-   public abstract JobExecutionResult execute(String jobName) throws 
Exception;
+   public JobExecutionResult execute(String jobName) throws Exception {
+   return execute(jobName, "");
+   }
+
+   /**
+* Triggers the program execution. The environment will execute all 
parts of the program that have
+* resulted in a "sink" operation. Sink operations are for example 
printing results ({@link DataSet#print()},
+* writing results (e.g. {@link DataSet#writeAsText(String)},
+* {@link 
DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or 
other generic
+* data sinks created with {@link 
DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+*
+* The program execution will be logged and displayed with the given 
job name and description.
+*
+* @return The result of the job execution, containing elapsed time and 
accumulators.
+* @throws Exception Thrown, if the program executions fails.
+*/
+   public abstract JobExecutionResult execute(String jobName, String 
jobDescription) throws Exception;
 
 Review comment:
   Solved some compatibility issues, and now only this fundamental problem 
remains. cc @dawidwys and @tillrohrmann @aljoscha 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-10-08 Thread GitBox
yanghua commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r223569543
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
 ##
 @@ -823,12 +823,29 @@ public JobExecutionResult execute() throws Exception {
 * {@link 
DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or 
other generic
 * data sinks created with {@link 
DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
 *
-* The program execution will be logged and displayed with the given 
job name.
+* The program execution will be logged and displayed with the given 
job name and a generated
+* default description.
 *
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 * @throws Exception Thrown, if the program executions fails.
 */
-   public abstract JobExecutionResult execute(String jobName) throws 
Exception;
+   public JobExecutionResult execute(String jobName) throws Exception {
+   return execute(jobName, "");
+   }
+
+   /**
+* Triggers the program execution. The environment will execute all 
parts of the program that have
+* resulted in a "sink" operation. Sink operations are for example 
printing results ({@link DataSet#print()},
+* writing results (e.g. {@link DataSet#writeAsText(String)},
+* {@link 
DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or 
other generic
+* data sinks created with {@link 
DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+*
+* The program execution will be logged and displayed with the given 
job name and description.
+*
+* @return The result of the job execution, containing elapsed time and 
accumulators.
+* @throws Exception Thrown, if the program executions fails.
+*/
+   public abstract JobExecutionResult execute(String jobName, String 
jobDescription) throws Exception;
 
 Review comment:
   Solved some compatibility issues, and now only this fundamental problem 
remains. cc @dawidwys and @tillrohrmann @aljoscha 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10514) change Tachyon to Alluxio

2018-10-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642848#comment-16642848
 ] 

vinoyang commented on FLINK-10514:
--

hi [~yangxiaoshuo] , Usually you need to send an email to the dev mailing list 
to request JIRA contribution permission. I will ping [~till.rohrmann] and 
[~Zentol] to give you JIRA permission, if you get it you can claim the issue 
yourself.

> change Tachyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Assignee: Guibo Pan
>Priority: Minor
>  Labels: pull-request-available
>
> Since the *Tachyon* renamed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642790#comment-16642790
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223557953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
+   private final Map> scheduledEvents = 
new HashMap<>();
+
+   public void register(ChannelHandlerContext ctx, long 
flushTimeout, NetworkSequenceViewReader reader) {
 
 Review comment:
   remove `public` for all the methods in this inner class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223557953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
+   private final Map> scheduledEvents = 
new HashMap<>();
+
+   public void register(ChannelHandlerContext ctx, long 
flushTimeout, NetworkSequenceViewReader reader) {
 
 Review comment:
   remove `public` for all the methods in this inner class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642779#comment-16642779
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223556150
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
 
 Review comment:
   We should clear the related maps or cancel the future in 
`releaseAllResources` caused by `exceptionCaught` or `channelInactive`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223556150
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
 
 Review comment:
   We should clear the related maps or cancel the future in 
`releaseAllResources` caused by `exceptionCaught` or `channelInactive`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10472) Add CBRT math function supported in Table API and SQL

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642767#comment-16642767
 ] 

ASF GitHub Bot commented on FLINK-10472:


Guibo-Pan commented on issue #6783: [FLINK-10472] [table] Add CBRT math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6783#issuecomment-428051714
 
 
   @aai95 From my side, I wonder if we are going on to add "the fourth root", 
"the fifth root" and so on, or add a standard "the nth root" function. In that 
case, I think consistency should take into consideration. It's just an opinion, 
feel free to remain this or discuss more if you think not.
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add CBRT math function supported in Table API and SQL
> -
>
> Key: FLINK-10472
> URL: https://issues.apache.org/jira/browse/FLINK-10472
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Aleksei Izmalkin
>Assignee: Aleksei Izmalkin
>Priority: Minor
>  Labels: pull-request-available
>
> Implement the function to calculate the cube root.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on issue #6783: [FLINK-10472] [table] Add CBRT math function supported in Table API and SQL

2018-10-08 Thread GitBox
Guibo-Pan commented on issue #6783: [FLINK-10472] [table] Add CBRT math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6783#issuecomment-428051714
 
 
   @aai95 From my side, I wonder if we are going on to add "the fourth root", 
"the fifth root" and so on, or add a standard "the nth root" function. In that 
case, I think consistency should take into consideration. It's just an opinion, 
feel free to remain this or discuss more if you think not.
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642757#comment-16642757
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223553034
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
+   private final Map> scheduledEvents = 
new HashMap<>();
+
+   public void register(ChannelHandlerContext ctx, long 
flushTimeout, NetworkSequenceViewReader reader) {
+   checkState(!flushTimeouts.containsKey(reader));
+   checkState(flushTimeout > 0);
 
 Review comment:
   If we check the `flushTimeout` at the beginning in `ResultPartition` stack 
as mentioned above, we need not care about it in the following processes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223553034
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -354,4 +404,64 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
}
}
}
+
+   private static class RegisterPeriodicFlushEvent {
+   private final NetworkSequenceViewReader reader;
+   private final long flushTimeout;
+
+   public RegisterPeriodicFlushEvent(NetworkSequenceViewReader 
reader, long flushTimeout) {
+   this.reader = checkNotNull(reader);
+   this.flushTimeout = flushTimeout;
+   }
+
+   public NetworkSequenceViewReader getReader() {
+   return reader;
+   }
+
+   public long getFlushTimeout() {
+   return flushTimeout;
+   }
+   }
+
+   private static class PeriodicFlushes {
+   private final Map> 
periodicFlushes = new HashMap<>();
+   private final Map 
flushTimeouts = new HashMap<>();
+   private final Map> scheduledEvents = 
new HashMap<>();
+
+   public void register(ChannelHandlerContext ctx, long 
flushTimeout, NetworkSequenceViewReader reader) {
+   checkState(!flushTimeouts.containsKey(reader));
+   checkState(flushTimeout > 0);
 
 Review comment:
   If we check the `flushTimeout` at the beginning in `ResultPartition` stack 
as mentioned above, we need not care about it in the following processes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642749#comment-16642749
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223552250
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -97,6 +107,18 @@ public void run() {
});
}
 
+   void registerPeriodicFlush(NetworkSequenceViewReader reader, long 
flushTimeout) {
+   if (flushTimeout == 0) {
 
 Review comment:
   This condition check can be done earlier in `ResultPartition` or 
`ResultSubpartition` stack?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223552250
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ##
 @@ -97,6 +107,18 @@ public void run() {
});
}
 
+   void registerPeriodicFlush(NetworkSequenceViewReader reader, long 
flushTimeout) {
+   if (flushTimeout == 0) {
 
 Review comment:
   This condition check can be done earlier in `ResultPartition` or 
`ResultSubpartition` stack?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering

2018-10-08 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642744#comment-16642744
 ] 

Biao Liu commented on FLINK-9788:
-

After checking the log file, I believe this is a critical bug. If the scenario 
happens below, the job would never recover as the log shows.
1. A failover happens due to lost of TM, ExecutionGraph tries to restart itself
2. The restarter(1) is in ExecutionGraph.restart(), resetting all executions
3. Another fatal error happens, it triggers ExecutionGraph.failGlobal, the 
state of ExecutionGraph is RESTARTING, it would increase the global version and 
try to restart ExecutionGraph too
4. The restarter(1) would fail, due to the global version is mismatched, some 
executions are resetted to CREATED, some executions are not
5. As the restarter(1) is failed, it would trigger another failGlobal without 
changing the state of ExecutionGraph (RESTARTING)
6. Some executions resetted in step 4 would fail forever in restarting

I believe the problem is that the state RESTARTING of ExecutionGraph is not a 
safe state that we can do the restarting without any cancelation. Maybe we 
should do a cancelation while the state is RESTARTING. 

> ExecutionGraph Inconsistency prevents Job from recovering
> -
>
> Key: FLINK-9788
> URL: https://issues.apache.org/jira/browse/FLINK-9788
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.0
> Environment: Rev: 4a06160
> Hadoop 2.8.3
>Reporter: Gary Yao
>Priority: Critical
> Fix For: 1.7.0, 1.6.2
>
> Attachments: jobmanager_5000.log
>
>
> Deployment mode: YARN job mode with HA
> After killing many TaskManagers in succession, the state of the 
> ExecutionGraph ran into an inconsistent state, which prevented job recovery. 
> The following stacktrace was logged in the JobManager log several hundred 
> times per second:
> {noformat}
> -08 16:47:18,855 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph 
>- Job General purpose test job (37a794195840700b98feb23e99f7ea24) 
> switched from state RESTARTING to RESTARTING.
> 2018-07-08 16:47:18,856 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting 
> the job General purpose test job (37a794195840700b98feb23e99f7ea24).
> 2018-07-08 16:47:18,857 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Resetting 
> execution vertex Source: Custom Source -> Timestamps/Watermarks (1/10) for 
> new execution.
> 2018-07-08 16:47:18,857 WARN  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Failed to 
> restart the job.
> java.lang.IllegalStateException: Cannot reset a vertex that is in 
> non-terminal state CREATED
> at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.resetForNewExecution(ExecutionVertex.java:610)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:573)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1251)
> at 
> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
> at 
> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The resulting jobmanager log file was 4.7 GB in size. Find attached the first 
> 5000 lines of the log file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642745#comment-16642745
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223550777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
 
 Review comment:
   1. The behavior of this method may be simple like `notifyDataAvailable`?  So 
we can reduce `synchronized` part.
   ```
if (readView != null) {
readView. registerPeriodicFlush(flushTimeout);
}
   ```
   2. This implementation is same in 
`SpillableSubpartition#registerPeriodicFlush`, how about implementing this 
method in the parent `ResultSubpartition#registerPeriodicFlush`? And the 
relevant change is also defining protected `readView` in `ResultSubpartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223550777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
 
 Review comment:
   1. The behavior of this method may be simple like `notifyDataAvailable`?  So 
we can reduce `synchronized` part.
   ```
if (readView != null) {
readView. registerPeriodicFlush(flushTimeout);
}
   ```
   2. This implementation is same in 
`SpillableSubpartition#registerPeriodicFlush`, how about implementing this 
method in the parent `ResultSubpartition#registerPeriodicFlush`? And the 
relevant change is also defining protected `readView` in `ResultSubpartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642742#comment-16642742
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223551011
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -106,11 +109,32 @@ protected Throwable getFailureCause() {
 
abstract public void flush();
 
+   /**
+* Remote subpartitions support automatic periodic flush. This is the 
method to register it.
+* Can only by used after {@link #isLocal()} state is known.
 
 Review comment:
   by used -> be used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223551011
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ##
 @@ -106,11 +109,32 @@ protected Throwable getFailureCause() {
 
abstract public void flush();
 
+   /**
+* Remote subpartitions support automatic periodic flush. This is the 
method to register it.
+* Can only by used after {@link #isLocal()} state is known.
 
 Review comment:
   by used -> be used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642741#comment-16642741
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223550777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
 
 Review comment:
   1. The behavior of this method may be simple like `notifyDataAvailable`?  So 
we can reduce `synchronized` part.
   ```
if (readView != null) {
readView. registerPeriodicFlush(flushTimeout);
}
   ```
   2. This implementation is same in 
`SpillableSubpartition#registerPeriodicFlush`, maybe we can put this method in 
the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change 
is also defining protected `readView` in `ResultSubpartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223550777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -77,6 +77,14 @@ public void flush() {
}
}
 
+   @Override
+   public void registerPeriodicFlush(long flushTimeout) {
 
 Review comment:
   1. The behavior of this method may be simple like `notifyDataAvailable`?  So 
we can reduce `synchronized` part.
   ```
if (readView != null) {
readView. registerPeriodicFlush(flushTimeout);
}
   ```
   2. This implementation is same in 
`SpillableSubpartition#registerPeriodicFlush`, maybe we can put this method in 
the parent `ResultSubpartition#registerPeriodicFlush`? And the relevant change 
is also defining protected `readView` in `ResultSubpartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10514) change Tachyon to Alluxio

2018-10-08 Thread yangxiaoshuo (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642719#comment-16642719
 ] 

yangxiaoshuo commented on FLINK-10514:
--

Thanks! And I wonder how to assign an issue to me?[~guibopan] Could you give me 
some advice?

> change Tachyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Assignee: Guibo Pan
>Priority: Minor
>  Labels: pull-request-available
>
> Since the *Tachyon* renamed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642714#comment-16642714
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223546433
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
 
 Review comment:
   it's ->its


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-08 Thread GitBox
zhijiangW commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r223546433
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -94,6 +96,19 @@
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
 
+   /**
+* Subset of {@code subpartitions} that are definitely local. We can 
only determine whether a
+* subpartition is local or not once it's read view was created.
 
 Review comment:
   it's ->its


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10514) change Tachyon to Alluxio

2018-10-08 Thread yangxiaoshuo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yangxiaoshuo updated FLINK-10514:
-
Description: Since the *Tachyon* renamed to *Alluxio,* we should change doc 
as well.  (was: Since the *Tachyon* renameed to *Alluxio,* we should change doc 
as well.)
Summary: change Tachyon to Alluxio  (was: change Tychyon to Alluxio)

> change Tachyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Assignee: Guibo Pan
>Priority: Minor
>  Labels: pull-request-available
>
> Since the *Tachyon* renamed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642687#comment-16642687
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] 
UTF-16 support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r223541560
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
 ##
 @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner 
getInputSplitAssigner(FileInputSplit[] splits
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
+   String bomCharsetName = getBomCharset(file);
 
 Review comment:
   Well, I first take the logic of checking the BOM in FileInputFormat to 
DelimitedInputFormat. I want to use a Map to cache the BOM encoding of the 
file, using the filename as the key and the BOM encoding as the value. If the 
value exists in the Map, the corresponding value is read, and if the Map does 
not exist, the BOM encoding of the file is read.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-10-08 Thread GitBox
XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] 
UTF-16 support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#discussion_r223541560
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
 ##
 @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner 
getInputSplitAssigner(FileInputSplit[] splits
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
+   String bomCharsetName = getBomCharset(file);
 
 Review comment:
   Well, I first take the logic of checking the BOM in FileInputFormat to 
DelimitedInputFormat. I want to use a Map to cache the BOM encoding of the 
file, using the filename as the key and the BOM encoding as the value. If the 
value exists in the Map, the corresponding value is read, and if the Map does 
not exist, the BOM encoding of the file is read.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10404) Declarative resource management

2018-10-08 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642640#comment-16642640
 ] 

JIN SUN commented on FLINK-10404:
-

Glad to see the design document, will take a look and comment.

> Declarative resource management
> ---
>
> Key: FLINK-10404
> URL: https://issues.apache.org/jira/browse/FLINK-10404
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> This is the umbrella issue to track the progress on implementing declarative 
> resource management.
> Declarative resource management is a change to Flink's current slot 
> allocation protocol. Instead of letting the {{JobMaster}} ask for each slot 
> individually, it will tell the {{ResourceManager}} its current need (min, 
> target, max) of slots. Based on that, the {{ResourceManager}} will assign 
> available slots or start new {{TaskExecutors}} to fulfill the request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10498) Decouple ExecutionGraph from JobMaster

2018-10-08 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642635#comment-16642635
 ] 

JIN SUN commented on FLINK-10498:
-

this seems related to https://issues.apache.org/jira/browse/FLINK-10429 ?

> Decouple ExecutionGraph from JobMaster
> --
>
> Key: FLINK-10498
> URL: https://issues.apache.org/jira/browse/FLINK-10498
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> With declarative resource management we want to react to the set of available 
> resources. Thus, we need a component which is responsible for scaling the 
> {{ExecutionGraph}} accordingly. In order to better do this and separate 
> concerns, it is beneficial to introduce a {{Scheduler/ExecutionGraphDriver}} 
> component which is in charge of the {{ExecutionGraph}}. This component owns 
> the {{ExecutionGraph}} and is allowed to modify it. In the first version, 
> this component will simply accommodate all the existing logic of the 
> {{JobMaster}} and the respective {{JobMaster}} methods are forwarded to this 
> component. 
> This new component should not change the existing behaviour of Flink.
> Later this component will be in charge of announcing the required resources, 
> deciding when to rescale and executing the rescaling operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642359#comment-16642359
 ] 

ASF GitHub Bot commented on FLINK-10380:


satybald commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-427952541
 
 
   @StephanEwen thank you for feedback! From what I see, 
`KeyGroupRangeAssignment#assignToKeyGroup` is utility class. It has 36 usages 
in different classes: state backends, operators, and tests. 
`AbstractKeyedStateBackend#setCurrentKey`, which uses the utilty method has 296 
usages.
   
   I'm bit afraid that catching and testing NPE earlier in the chain requires a 
significant effort to rewrite everything without much benefit. As try/catch 
doesn't make code cleaner and adding the same error message will violate DRY 
principle. 
   
   Also, determine if the key is null earlier in the chain without 
preconditions with try/catch might lead to false positives as there's another 
reason for child method throwing NPE.
   
   From what I see `computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);` 
is the place where key group index calcluated and NPE is thrown in case key is 
null. Would you agree that catching NPE in utility class and overriding with a 
specific message is viable alternative?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Check if key is not nul before assign to group in KeyedStream
> -
>
> Key: FLINK-10380
> URL: https://issues.apache.org/jira/browse/FLINK-10380
> Project: Flink
>  Issue Type: Task
>Affects Versions: 1.6.0
>Reporter: Sayat Satybaldiyev
>Priority: Minor
>  Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null, 
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink 
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive 
> error if it's a null.
>  
> *Job Example*:
>  
> {code:java}
> DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
>  .map(x -> (String)null)
>  .keyBy(x -> x);{code}
>  
>  
> An error that is thrown:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
>  INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
> service.
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>  at 
> org.apache.flink.runtime.io.network.api.wr

[GitHub] satybald commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method

2018-10-08 Thread GitBox
satybald commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-427952541
 
 
   @StephanEwen thank you for feedback! From what I see, 
`KeyGroupRangeAssignment#assignToKeyGroup` is utility class. It has 36 usages 
in different classes: state backends, operators, and tests. 
`AbstractKeyedStateBackend#setCurrentKey`, which uses the utilty method has 296 
usages.
   
   I'm bit afraid that catching and testing NPE earlier in the chain requires a 
significant effort to rewrite everything without much benefit. As try/catch 
doesn't make code cleaner and adding the same error message will violate DRY 
principle. 
   
   Also, determine if the key is null earlier in the chain without 
preconditions with try/catch might lead to false positives as there's another 
reason for child method throwing NPE.
   
   From what I see `computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);` 
is the place where key group index calcluated and NPE is thrown in case key is 
null. Would you agree that catching NPE in utility class and overriding with a 
specific message is viable alternative?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642336#comment-16642336
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223465173
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -909,6 +926,70 @@ private void clearDispatcherState() {
terminateJobManagerRunners();
}
 
+   private void instantiateJobManagerOverviewMetrics(MetricGroup 
jobManagerMetricGroup) {
+   long defaultTimeoutValue = 
configuration.getLong(RestOptions.CONNECTION_TIMEOUT);
+   Time defaultRequestTimeout = 
Time.milliseconds(defaultTimeoutValue);
+
+   jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> {
+   try {
+   return (long) resourceManagerGateway
 
 Review comment:
   This method results in 3 separate RPC calls being made every time metrics 
are being requested. If the dispatcher cannot reasonably provide this 
information then they should be exposed at the resource manager instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223468454
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
+   invokeLatch.await(60, TimeUnit.SECONDS);
+   waitForJob();
+   }
+
+   @Test
+   public void testJobManagerMetrics() throws Exception {
+   assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+   TestReporter reporter = 
TestReporter.OPENED_REPORTERS.iterator().next();
+
+   List expectedPatterns = getExpectedPatterns();
+
+   Collection gaugeNames = reporter.getGauges().values();
+
+   for (String expectedPattern : expectedPatterns) {
+   boolean 

[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642338#comment-16642338
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223468630
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
+   invokeLatch.await(60, TimeUnit.SECONDS);
+   waitForJob();
+   }
+
+   @Test
+   public void testJobManagerMetrics() throws Exception {
+   assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+   TestReporter reporter = 
TestReporter.OPENED_REPORTE

[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642339#comment-16642339
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223468454
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
+   invokeLatch.await(60, TimeUnit.SECONDS);
+   waitForJob();
+   }
+
+   @Test
+   public void testJobManagerMetrics() throws Exception {
+   assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+   TestReporter reporter = 
TestReporter.OPENED_REPORTE

[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642333#comment-16642333
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223467705
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
 
 Review comment:
   You could simplify this section by simple defining a job that has a blocking 
UDF and executing it in a separate thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: 

[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642335#comment-16642335
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223466964
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -909,6 +926,70 @@ private void clearDispatcherState() {
terminateJobManagerRunners();
}
 
+   private void instantiateJobManagerOverviewMetrics(MetricGroup 
jobManagerMetricGroup) {
+   long defaultTimeoutValue = 
configuration.getLong(RestOptions.CONNECTION_TIMEOUT);
+   Time defaultRequestTimeout = 
Time.milliseconds(defaultTimeoutValue);
+
+   jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> {
+   try {
+   return (long) resourceManagerGateway
+   
.requestResourceOverview(defaultRequestTimeout)
+   .get()
+   .getNumberFreeSlots();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Request resource overview occurs 
exception.", e);
+   }
+
+   return 0L;
+   });
+
+   jobManagerMetricGroup.gauge("taskSlotsTotal", () -> {
+   try {
+   return (long) resourceManagerGateway
+   
.requestResourceOverview(defaultRequestTimeout)
+   .get()
+   .getNumberRegisteredSlots();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Request resource overview occurs 
exception.", e);
+   }
+
+   return 0L;
+   });
+
+   jobManagerMetricGroup.gauge("numRegisteredTaskManagers", () -> {
+   try {
+   return (long) resourceManagerGateway
+   
.requestResourceOverview(defaultRequestTimeout)
+   .get()
+   .getNumberTaskManagers();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Request resource overview occurs 
exception.", e);
+   }
+
+   return 0L;
+   });
+
+   jobManagerMetricGroup.gauge("numRunningJobs", () -> {
+   try {
+   return (long) 
requestOverviewForAllJobs(defaultRequestTimeout)
 
 Review comment:
   This results in a wave of RPC calls to every single JM. Would it not be 
feasible to simple count the number of job IDs contained in 
`jobManagerRunnerFutures`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223465173
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -909,6 +926,70 @@ private void clearDispatcherState() {
terminateJobManagerRunners();
}
 
+   private void instantiateJobManagerOverviewMetrics(MetricGroup 
jobManagerMetricGroup) {
+   long defaultTimeoutValue = 
configuration.getLong(RestOptions.CONNECTION_TIMEOUT);
+   Time defaultRequestTimeout = 
Time.milliseconds(defaultTimeoutValue);
+
+   jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> {
+   try {
+   return (long) resourceManagerGateway
 
 Review comment:
   This method results in 3 separate RPC calls being made every time metrics 
are being requested. If the dispatcher cannot reasonably provide this 
information then they should be exposed at the resource manager instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223469702
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
+   invokeLatch.await(60, TimeUnit.SECONDS);
+   waitForJob();
+   }
+
+   @Test
+   public void testJobManagerMetrics() throws Exception {
+   assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+   TestReporter reporter = 
TestReporter.OPENED_REPORTERS.iterator().next();
+
+   List expectedPatterns = getExpectedPatterns();
+
+   Collection gaugeNames = reporter.getGauges().values();
+
+   for (String expectedPattern : expectedPatterns) {
+   boolean 

[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223466071
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -909,6 +926,70 @@ private void clearDispatcherState() {
terminateJobManagerRunners();
}
 
+   private void instantiateJobManagerOverviewMetrics(MetricGroup 
jobManagerMetricGroup) {
+   long defaultTimeoutValue = 
configuration.getLong(RestOptions.CONNECTION_TIMEOUT);
 
 Review comment:
   This is not an appropriate timeout as this is completely unrelated to the 
REST API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642334#comment-16642334
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223469702
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
+   invokeLatch.await(60, TimeUnit.SECONDS);
+   waitForJob();
+   }
+
+   @Test
+   public void testJobManagerMetrics() throws Exception {
+   assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+   TestReporter reporter = 
TestReporter.OPENED_REPORTE

[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223467705
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
 
 Review comment:
   You could simplify this section by simple defining a job that has a blocking 
UDF and executing it in a separate thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223466964
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -909,6 +926,70 @@ private void clearDispatcherState() {
terminateJobManagerRunners();
}
 
+   private void instantiateJobManagerOverviewMetrics(MetricGroup 
jobManagerMetricGroup) {
+   long defaultTimeoutValue = 
configuration.getLong(RestOptions.CONNECTION_TIMEOUT);
+   Time defaultRequestTimeout = 
Time.milliseconds(defaultTimeoutValue);
+
+   jobManagerMetricGroup.gauge("taskSlotsAvailable", () -> {
+   try {
+   return (long) resourceManagerGateway
+   
.requestResourceOverview(defaultRequestTimeout)
+   .get()
+   .getNumberFreeSlots();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Request resource overview occurs 
exception.", e);
+   }
+
+   return 0L;
+   });
+
+   jobManagerMetricGroup.gauge("taskSlotsTotal", () -> {
+   try {
+   return (long) resourceManagerGateway
+   
.requestResourceOverview(defaultRequestTimeout)
+   .get()
+   .getNumberRegisteredSlots();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Request resource overview occurs 
exception.", e);
+   }
+
+   return 0L;
+   });
+
+   jobManagerMetricGroup.gauge("numRegisteredTaskManagers", () -> {
+   try {
+   return (long) resourceManagerGateway
+   
.requestResourceOverview(defaultRequestTimeout)
+   .get()
+   .getNumberTaskManagers();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Request resource overview occurs 
exception.", e);
+   }
+
+   return 0L;
+   });
+
+   jobManagerMetricGroup.gauge("numRunningJobs", () -> {
+   try {
+   return (long) 
requestOverviewForAllJobs(defaultRequestTimeout)
 
 Review comment:
   This results in a wave of RPC calls to every single JM. Would it not be 
feasible to simple count the number of job IDs contained in 
`jobManagerRunnerFutures`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223468630
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java
 ##
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization of the job manager metrics.
+ */
+public class JobManagerMetricsITCase {
+
+   private static final String JOB_MANAGER_METRICS_PREFIX = 
"localhost.jobmanager.";
+   private static final String TASK_SLOTS_AVAILABLE = "taskSlotsAvailable";
+   private static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
+   private static final String NUM_REGISTERED_TASK_MANAGERS = 
"numRegisteredTaskManagers";
+   private static final String NUM_RUNNING_JOBS = "numRunningJobs";
+
+   private static CountDownLatch invokeLatch;
+
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @ClassRule
+   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+   new MiniClusterResourceConfiguration.Builder()
+   .setConfiguration(getConfiguration())
+   .setNumberTaskManagers(1)
+   .setNumberSlotsPerTaskManager(1)
+   .build());
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   MINI_CLUSTER_RESOURCE.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
MINI_CLUSTER_RESOURCE.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
+   invokeLatch.await(60, TimeUnit.SECONDS);
+   waitForJob();
+   }
+
+   @Test
+   public void testJobManagerMetrics() throws Exception {
+   assertEquals(1, TestReporter.OPENED_REPORTERS.size());
+   TestReporter reporter = 
TestReporter.OPENED_REPORTERS.iterator().next();
+
+   List expectedPatterns = getExpectedPatterns();
+
+   Collection gaugeNames = reporter.getGauges().values();
+
+   for (String expectedPattern : expectedPatterns) {
+   boolean 

[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642337#comment-16642337
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r223466071
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -909,6 +926,70 @@ private void clearDispatcherState() {
terminateJobManagerRunners();
}
 
+   private void instantiateJobManagerOverviewMetrics(MetricGroup 
jobManagerMetricGroup) {
+   long defaultTimeoutValue = 
configuration.getLong(RestOptions.CONNECTION_TIMEOUT);
 
 Review comment:
   This is not an appropriate timeout as this is completely unrelated to the 
REST API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-08 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642185#comment-16642185
 ] 

Andrey Zagrebin commented on FLINK-10295:
-

The pull request adds another way to provide program args as list of string. 
The most secure option is to pass program args as a json list in the body of 
the http request. This way is cross-platform. The args string does not need any 
tokenisation because they are already tokenised in the list. The follow-up 
issue FLINK-10515 is to still improve the tokenisation to simplify passing args 
as string in web UI at least for unix style escaping.

> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10515) Improve tokenisation of program args passed as string

2018-10-08 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-10515:
---

 Summary: Improve tokenisation of program args passed as string
 Key: FLINK-10515
 URL: https://issues.apache.org/jira/browse/FLINK-10515
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.7.0
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin


At the moment tokenisation of program args does not respect escape characters. 
It can be improved to support at least program args separated by spaces with 
unix style escaping.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10514) change Tychyon to Alluxio

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642147#comment-16642147
 ] 

ASF GitHub Bot commented on FLINK-10514:


Guibo-Pan opened a new pull request #6806: [FLINK-10514][docs] change tachyon 
to alluxio in doc
URL: https://github.com/apache/flink/pull/6806
 
 
   ## What is the purpose of the change
   *Since Tachyon was renamed to Alluxio, replace the remaining Tachyon to 
Alluxio in docs.*
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> change Tychyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Assignee: Guibo Pan
>Priority: Minor
>  Labels: pull-request-available
>
> Since the *Tachyon* renameed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10514) change Tychyon to Alluxio

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10514:
---
Labels: pull-request-available  (was: )

> change Tychyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Assignee: Guibo Pan
>Priority: Minor
>  Labels: pull-request-available
>
> Since the *Tachyon* renameed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan opened a new pull request #6806: [FLINK-10514][docs] change tachyon to alluxio in doc

2018-10-08 Thread GitBox
Guibo-Pan opened a new pull request #6806: [FLINK-10514][docs] change tachyon 
to alluxio in doc
URL: https://github.com/apache/flink/pull/6806
 
 
   ## What is the purpose of the change
   *Since Tachyon was renamed to Alluxio, replace the remaining Tachyon to 
Alluxio in docs.*
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10514) change Tychyon to Alluxio

2018-10-08 Thread Guibo Pan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642120#comment-16642120
 ] 

Guibo Pan commented on FLINK-10514:
---

It seems that the work was almost done in 
[FLINK-4185|https://issues.apache.org/jira/browse/FLINK-4185], while just a 
little left. I will go on to fulfill it.

> change Tychyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Priority: Minor
>
> Since the *Tachyon* renameed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10514) change Tychyon to Alluxio

2018-10-08 Thread Guibo Pan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guibo Pan reassigned FLINK-10514:
-

Assignee: Guibo Pan

> change Tychyon to Alluxio
> -
>
> Key: FLINK-10514
> URL: https://issues.apache.org/jira/browse/FLINK-10514
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: yangxiaoshuo
>Assignee: Guibo Pan
>Priority: Minor
>
> Since the *Tachyon* renameed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642090#comment-16642090
 ] 

ASF GitHub Bot commented on FLINK-10340:


yanghua commented on issue #6700: [FLINK-10340][table] Add Cosh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#issuecomment-427896213
 
 
   It's strange. It seems the Travis is in progress. But in detail page, it 
shows green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Cosh math function supported in Table API and SQL
> -
>
> Key: FLINK-10340
> URL: https://issues.apache.org/jira/browse/FLINK-10340
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Sergey Tsvetkov
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Implement udf of cosh, just like in oracle
> [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL

2018-10-08 Thread GitBox
yanghua commented on issue #6700: [FLINK-10340][table] Add Cosh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#issuecomment-427896213
 
 
   It's strange. It seems the Travis is in progress. But in detail page, it 
shows green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642082#comment-16642082
 ] 

ASF GitHub Bot commented on FLINK-10135:


yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-427895003
 
 
   @zentol I have changed the test case, please review it again, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-08 Thread GitBox
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-427895003
 
 
   @zentol I have changed the test case, please review it again, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10126) There should be a Scala DataSource

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642080#comment-16642080
 ] 

ASF GitHub Bot commented on FLINK-10126:


yanghua commented on issue #6738: [FLINK-10126] There should be a Scala 
DataSource
URL: https://github.com/apache/flink/pull/6738#issuecomment-427894706
 
 
   Hi @tillrohrmann I have updated this PR based on your suggestion, however 
the `japicmp-maven-plugin` reported incompatibility issue 
`METHOD_RETURN_TYPE_CHANGED `. It seems the method's signature would been 
verified. What's your idea?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> There should be a Scala DataSource
> --
>
> Key: FLINK-10126
> URL: https://issues.apache.org/jira/browse/FLINK-10126
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexis Sarda-Espinosa
>Assignee: vinoyang
>Priority: Minor
>  Labels: datasource, pull-request-available, scala
>
> In Java, an ExecutionEnvironment's createInput method returns a DataSource, 
> whereas the Scala version returns a DataSet. There is no Scala DataSource 
> wrapper, and the Scala DataSet does not provide the Java DataSource methods, 
> such as getSplitDataProperties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource

2018-10-08 Thread GitBox
yanghua commented on issue #6738: [FLINK-10126] There should be a Scala 
DataSource
URL: https://github.com/apache/flink/pull/6738#issuecomment-427894706
 
 
   Hi @tillrohrmann I have updated this PR based on your suggestion, however 
the `japicmp-maven-plugin` reported incompatibility issue 
`METHOD_RETURN_TYPE_CHANGED `. It seems the method's signature would been 
verified. What's your idea?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642059#comment-16642059
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223416786
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
 ##
 @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) {
} catch (Exception e) {
fail();
 
 Review comment:
   I refactor it as 
   
   ```java
try {
byte[] b = InstantiationUtil.serializeObject(parameter);
final ParameterTool copy = 
InstantiationUtil.deserializeObject(b, getClass().getClassLoader());
internalValidate(copy);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
   ```
   
   this keeps the root cause and save us from modify signature of methods 
calling `AbstractParameterToolTest#validate`. If you prefer to propagate it, I 
am not opposite to it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor ParameterTool#fromArgs
> ---
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223416786
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
 ##
 @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) {
} catch (Exception e) {
fail();
 
 Review comment:
   I refactor it as 
   
   ```java
try {
byte[] b = InstantiationUtil.serializeObject(parameter);
final ParameterTool copy = 
InstantiationUtil.deserializeObject(b, getClass().getClassLoader());
internalValidate(copy);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
   ```
   
   this keeps the root cause and save us from modify signature of methods 
calling `AbstractParameterToolTest#validate`. If you prefer to propagate it, I 
am not opposite to it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642053#comment-16642053
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223415739
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-  

[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223415739
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalA

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642049#comment-16642049
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223415460
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
 
 Review comment:
   It is up to you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor ParameterTool#fromArgs
> ---
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r22341
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalA

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642051#comment-16642051
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r22341
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-  

[GitHub] TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
TisonKun commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223415460
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
 
 Review comment:
   It is up to you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642043#comment-16642043
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on issue #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-427886511
 
 
   @GJL thanks for your review and detailed comments! I have pushed a commit to 
address most of them. It is midnight in my timezone so I would be offline for 
hours. I am glad to see any beneficial modification on merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor ParameterTool#fromArgs
> ---
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
TisonKun commented on issue #6737: [FLINK-10399] Refactor ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-427886511
 
 
   @GJL thanks for your review and detailed comments! I have pushed a commit to 
address most of them. It is midnight in my timezone so I would be offline for 
hours. I am glad to see any beneficial modification on merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642021#comment-16642021
 ] 

ASF GitHub Bot commented on FLINK-10247:


Clark commented on issue #6759: [FLINK-10247][Metrics] Run 
MetricQueryService in a dedicated actor system
URL: https://github.com/apache/flink/pull/6759#issuecomment-427880375
 
 
   Well, that makes sense @tillrohrmann. And maybe we will rename the actor 
system member variable if other component rely on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Run MetricQueryService in separate thread pool
> --
>
> Key: FLINK-10247
> URL: https://issues.apache.org/jira/browse/FLINK-10247
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system

2018-10-08 Thread GitBox
Clark commented on issue #6759: [FLINK-10247][Metrics] Run 
MetricQueryService in a dedicated actor system
URL: https://github.com/apache/flink/pull/6759#issuecomment-427880375
 
 
   Well, that makes sense @tillrohrmann. And maybe we will rename the actor 
system member variable if other component rely on it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641990#comment-16641990
 ] 

ASF GitHub Bot commented on FLINK-5542:
---

GJL commented on issue #6775: [FLINK-5542] use YarnCluster vcores setting to do 
MaxVCore validation
URL: https://github.com/apache/flink/pull/6775#issuecomment-427871290
 
 
   LGTM, merging as soon as build is green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YARN client incorrectly uses local YARN config to check vcore capacity
> --
>
> Key: FLINK-5542
> URL: https://issues.apache.org/jira/browse/FLINK-5542
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.4, 1.5.3, 1.6.0, 1.7.0
>Reporter: Shannon Carey
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> See 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html
> When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 
> 1.1.4 is comparing the user's selected number of vcores to the vcores 
> configured in the local node's YARN config (from YarnConfiguration eg. 
> yarn-site.xml and yarn-default.xml). It incorrectly prevents Flink from 
> launching even if there is sufficient vcore capacity on the cluster.
> That is not correct, because the application will not necessarily run on the 
> local node. For example, if running the yarn-session.sh client from the AWS 
> EMR master node, the vcore count there may be different from the vcore count 
> on the core nodes where Flink will actually run.
> A reasonable way to fix this would probably be to reuse the logic from 
> "yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get 
> vcore information from the real worker nodes.  Alternatively, perhaps we 
> could remove the check entirely and rely on YARN's Scheduler to determine 
> whether sufficient resources exist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on issue #6775: [FLINK-5542] use YarnCluster vcores setting to do MaxVCore validation

2018-10-08 Thread GitBox
GJL commented on issue #6775: [FLINK-5542] use YarnCluster vcores setting to do 
MaxVCore validation
URL: https://github.com/apache/flink/pull/6775#issuecomment-427871290
 
 
   LGTM, merging as soon as build is green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641964#comment-16641964
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641962#comment-16641962
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   

[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalArgume

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641960#comment-16641960
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   

[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalArgume

[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalArgume

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641961#comment-16641961
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   

[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalArgume

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641956#comment-16641956
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641957#comment-16641957
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223311299
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
 
 Review comment:
   There are no tests asserting that `Error parsing arguments [...]` is part of 
the error message.
   
   Imo the test `testIllegalArgs` should be rewritten as follows:
   
   ```
@Test
public void testThrowExceptionIfParameterIsNotPrefixed() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Error parsing arguments '[a]' on 'a'. 
Please prefix keys with -- or -.");
   
ParameterTool.fromArgs(new String[]{"a"});
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor ParameterTool#fromArgs
> ---
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sen

[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641958#comment-16641958
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223348499
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
 ##
 @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) {
} catch (Exception e) {
fail();
 
 Review comment:
   Since you are already cleaning up:
   The test will fail if an exception is propagated. There is no need to to 
catch it and fail explicitly. Also we are currently masking the root cause. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor ParameterTool#fromArgs
> ---
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10399) Refactor ParameterTool#fromArgs

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641955#comment-16641955
 ] 

ASF GitHub Bot commented on FLINK-10399:


GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223394447
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
 
 Review comment:
   Optional:
   
   The function could benefit from _extract method_ refactoring:
   
   ```
   final String key = getNextKey(args, i);
   ```
   
   ```
private static String getNextKey(final String[] args, final int index) {
final String arg = args[index];
final String key;
   
if (arg.startsWith("--")) {
key = arg.substring(2);
} else if (arg.startsWith("-")) {
key = arg.substring(1);
} else {
throw new IllegalArgumentException(
String.format("Error parsing arguments '%s' on 
'%s'." + " Please prefix keys with -- or -.",
Arrays.toString(args), arg));
}
   
if (key.length() == 0) {
throw new IllegalArgumentException(
String.format("The input %s contains an empty 
argument",
Arrays.toString(args)));
}
return key;
}
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor ParameterTool#fromArgs
> ---
>
>

[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223394447
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
 
 Review comment:
   Optional:
   
   The function could benefit from _extract method_ refactoring:
   
   ```
   final String key = getNextKey(args, i);
   ```
   
   ```
private static String getNextKey(final String[] args, final int index) {
final String arg = args[index];
final String key;
   
if (arg.startsWith("--")) {
key = arg.substring(2);
} else if (arg.startsWith("-")) {
key = arg.substring(1);
} else {
throw new IllegalArgumentException(
String.format("Error parsing arguments '%s' on 
'%s'." + " Please prefix keys with -- or -.",
Arrays.toString(args), arg));
}
   
if (key.length() == 0) {
throw new IllegalArgumentException(
String.format("The input %s contains an empty 
argument",
Arrays.toString(args)));
}
return key;
}
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223348499
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
 ##
 @@ -52,30 +52,29 @@ protected void validate(ParameterTool parameter) {
} catch (Exception e) {
fail();
 
 Review comment:
   Since you are already cleaning up:
   The test will fail if an exception is propagated. There is no need to to 
catch it and fail explicitly. Also we are currently masking the root cause. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223391917
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
+
+   int i = 0;
+   while (i < args.length) {
+   final String key;
+
+   if (args[i].startsWith("--")) {
+   key = args[i].substring(2);
+   } else if (args[i].startsWith("-")) {
+   key = args[i].substring(1);
} else {
-   if (expectValue) {
-   value = arg;
-   expectValue = false;
-   } else {
-   throw new RuntimeException("Error 
parsing arguments '" + Arrays.toString(args) + "' on '" + arg + "'. Unexpected 
value. Please prefix values with -- or -.");
-   }
+   throw new IllegalArgumentException(
+   String.format(errorMessage + " Please 
prefix keys with -- or -.",
+   Arrays.toString(args), 
args[i]));
}
 
-   if (value == null && key == null) {
-   throw new IllegalStateException("Value and key 
can not be null at the same time");
-   }
-   if (key != null && value == null && !expectValue) {
-   throw new IllegalStateException("Value expected 
but flag not set");
-   }
-   if (key != null && value != null) {
-   map.put(key, value);
-   key = null;
-   value = null;
-   expectValue = false;
-   }
-   if (key != null && key.length() == 0) {
-   throw new IllegalArgume

[GitHub] GJL commented on a change in pull request #6737: [FLINK-10399] Refactor ParameterTool#fromArgs

2018-10-08 Thread GitBox
GJL commented on a change in pull request #6737: [FLINK-10399] Refactor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#discussion_r223311299
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
 ##
 @@ -68,81 +68,46 @@
 * @return A {@link ParameterTool}
 */
public static ParameterTool fromArgs(String[] args) {
-   Map map = new HashMap(args.length / 2);
-
-   String key = null;
-   String value = null;
-   boolean expectValue = false;
-   for (String arg : args) {
-   // check for -- argument
-   if (arg.startsWith("--")) {
-   if (expectValue) {
-   // we got into a new key, even though 
we were a value --> current key is one without value
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   map.put(key, NO_VALUE_KEY);
-   // key will be overwritten in the next 
step
-   }
-   key = arg.substring(2);
-   expectValue = true;
-   } // check for - argument
-   else if (arg.startsWith("-")) {
-   // we are waiting for a value, so this is a - 
prefixed value (negative number)
-   if (expectValue) {
-
-   if (NumberUtils.isNumber(arg)) {
-   // negative number
-   value = arg;
-   expectValue = false;
-   } else {
-   if (value != null) {
-   throw new 
IllegalStateException("Unexpected state");
-   }
-   // We waited for a value but 
found a new key. So the previous key doesnt have a value.
-   map.put(key, NO_VALUE_KEY);
-   key = arg.substring(1);
-   expectValue = true;
-   }
-   } else {
-   // we are not waiting for a value, so 
its an argument
-   key = arg.substring(1);
-   expectValue = true;
-   }
+   final Map map = new HashMap<>(args.length / 2);
+
+   final String errorMessage = "Error parsing arguments '%s' on 
'%s'.";
 
 Review comment:
   There are no tests asserting that `Error parsing arguments [...]` is part of 
the error message.
   
   Imo the test `testIllegalArgs` should be rewritten as follows:
   
   ```
@Test
public void testThrowExceptionIfParameterIsNotPrefixed() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Error parsing arguments '[a]' on 'a'. 
Please prefix keys with -- or -.");
   
ParameterTool.fromArgs(new String[]{"a"});
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10514) change Tychyon to Alluxio

2018-10-08 Thread yangxiaoshuo (JIRA)
yangxiaoshuo created FLINK-10514:


 Summary: change Tychyon to Alluxio
 Key: FLINK-10514
 URL: https://issues.apache.org/jira/browse/FLINK-10514
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: yangxiaoshuo


Since the *Tachyon* renameed to *Alluxio,* we should change doc as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641859#comment-16641859
 ] 

ASF GitHub Bot commented on FLINK-10156:


fhueske opened a new pull request #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805
 
 
   ## What is the purpose of the change
   
   * Deprecate `Table.writeToSink()` as discussed in FLINK-10156
   
   ## Brief change log
   
   * Deprecate `Table.writeToSink()`
   * Port usage of `writeToSink()` to `insertInto()`
   
   ## Verifying this change
   
   * run existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? Docs have been adjusted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10156:
---
Labels: pull-request-available  (was: )

> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske opened a new pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-08 Thread GitBox
fhueske opened a new pull request #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805
 
 
   ## What is the purpose of the change
   
   * Deprecate `Table.writeToSink()` as discussed in FLINK-10156
   
   ## Brief change log
   
   * Deprecate `Table.writeToSink()`
   * Port usage of `writeToSink()` to `insertInto()`
   
   ## Verifying this change
   
   * run existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? Docs have been adjusted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >