[jira] [Commented] (FLINK-7551) Add VERSION to the REST urls.

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589893#comment-16589893
 ] 

ASF GitHub Bot commented on FLINK-7551:
---

zentol commented on a change in pull request #6602:  [FLINK-7551][rest] Add 
versioning to REST API
URL: https://github.com/apache/flink/pull/6602#discussion_r212220053
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -365,21 +366,34 @@ public String getRestBaseUrl() {
}
 
private static void registerHandler(Router router, 
Tuple2 specificationHandler) {
-   switch (specificationHandler.f0.getHttpMethod()) {
+   final String handlerURL = 
specificationHandler.f0.getTargetRestEndpointURL();
+   // setup versioned urls
+   for (final RestAPIVersion supportedVersion : 
specificationHandler.f0.getSupportedAPIVersions()) {
+   final String versionedHandlerURL = '/' + 
supportedVersion.getURLVersionPrefix() + handlerURL;
+   registerHandler(router, versionedHandlerURL, 
specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+   }
+   // setup unversioned url for convenience and backwards 
compatibility
+   // this url will always point to the oldest supported version
 
 Review comment:
   will add a test to verify this behavior
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add VERSION to the REST urls. 
> --
>
> Key: FLINK-7551
> URL: https://issues.apache.org/jira/browse/FLINK-7551
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This is to guarantee that we can update the REST API without breaking 
> existing third-party clients.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6602: [FLINK-7551][rest] Add versioning to REST API

2018-08-23 Thread GitBox
zentol commented on a change in pull request #6602:  [FLINK-7551][rest] Add 
versioning to REST API
URL: https://github.com/apache/flink/pull/6602#discussion_r212220053
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -365,21 +366,34 @@ public String getRestBaseUrl() {
}
 
private static void registerHandler(Router router, 
Tuple2 specificationHandler) {
-   switch (specificationHandler.f0.getHttpMethod()) {
+   final String handlerURL = 
specificationHandler.f0.getTargetRestEndpointURL();
+   // setup versioned urls
+   for (final RestAPIVersion supportedVersion : 
specificationHandler.f0.getSupportedAPIVersions()) {
+   final String versionedHandlerURL = '/' + 
supportedVersion.getURLVersionPrefix() + handlerURL;
+   registerHandler(router, versionedHandlerURL, 
specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+   }
+   // setup unversioned url for convenience and backwards 
compatibility
+   // this url will always point to the oldest supported version
 
 Review comment:
   will add a test to verify this behavior
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589873#comment-16589873
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StefanRRichter commented on a change in pull request #6604: [FLINK-9061] 
Optionally add entropy to checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#discussion_r212216578
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/CheckpointPathFilter.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+
+/**
+ * A transformer that may modify paths for checkpoint data and metadata files.
+ */
+public interface CheckpointPathFilter {
 
 Review comment:
   `PathTransformer` might be a better name for what it does (see your own 
comment).
   
   I also wonder if we could simply collapse the separation path + transformer 
into a `PathProvider` or `PathFactory` and we could have one for data and one 
for meta data. This can be a bit tricky for `FsCheckpointStorageLocation`, but 
also the current solution seems problematic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10172) Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589876#comment-16589876
 ] 

ASF GitHub Bot commented on FLINK-10172:


twalthr commented on issue #6585: [FLINK-10172][table]re-adding asc & desc 
suffix expression to expressionParser
URL: https://github.com/apache/flink/pull/6585#issuecomment-415329206
 
 
   Thank you @walterddr. I would open an issue for this minor thing to at least 
track it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Inconsistentcy in ExpressionParser and ExpressionDsl for order by asc/desc
> --
>
> Key: FLINK-10172
> URL: https://issues.apache.org/jira/browse/FLINK-10172
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.3, 1.4.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.3.3, 1.4.3, 1.6.1, 1.7.0, 1.5.4
>
>
> The following expression throws an exception in parsing {{"id.asc"}} term.
> {code:java}
> Table allOrders = orderTable
> .select("id,order_date,amount,customer_id")
> .orderBy("id.asc");
> {code}
> while it is correctly parsed for Scala:
> {code:scala}
> val allOrders:Table = orderTable
> .select('id, 'order_date, 'amount, 'customer_id)
> .orderBy('id.asc)
> {code}
> Anticipated some inconsistency between ExpressionParser and ExpressionDsl



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6585: [FLINK-10172][table]re-adding asc & desc suffix expression to expressionParser

2018-08-23 Thread GitBox
twalthr commented on issue #6585: [FLINK-10172][table]re-adding asc & desc 
suffix expression to expressionParser
URL: https://github.com/apache/flink/pull/6585#issuecomment-415329206
 
 
   Thank you @walterddr. I would open an issue for this minor thing to at least 
track it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability

2018-08-23 Thread GitBox
StefanRRichter commented on a change in pull request #6604: [FLINK-9061] 
Optionally add entropy to checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#discussion_r212216578
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/CheckpointPathFilter.java
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.Path;
+
+/**
+ * A transformer that may modify paths for checkpoint data and metadata files.
+ */
+public interface CheckpointPathFilter {
 
 Review comment:
   `PathTransformer` might be a better name for what it does (see your own 
comment).
   
   I also wonder if we could simply collapse the separation path + transformer 
into a `PathProvider` or `PathFactory` and we could have one for data and one 
for meta data. This can be a bit tricky for `FsCheckpointStorageLocation`, but 
also the current solution seems problematic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9928) Add LOG2 function for table/sql API

2018-08-23 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-9928:

Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-6810

> Add LOG2 function for table/sql API
> ---
>
> Key: FLINK-9928
> URL: https://issues.apache.org/jira/browse/FLINK-9928
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> refer to : 
> https://dev.mysql.com/doc/refman/8.0/en/mathematical-functions.html#function_log2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8868) Support Table Function as Table for Stream Sql

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589862#comment-16589862
 ] 

ASF GitHub Bot commented on FLINK-8868:
---

pnowojski commented on a change in pull request #6574: [FLINK-8868] [table] 
Support Table Function as Table Source for Stream Sql
URL: https://github.com/apache/flink/pull/6574#discussion_r212213682
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ##
 @@ -897,6 +897,45 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 assertEquals(List(expected.toString()), StreamITCase.testResults.sorted)
   }
+
+  @Test
 
 Review comment:
   What's the problem with Table API here? I had a suspicion that this 
`ValidationException`:
   ```
 def getRelNode: RelNode = if (containsUnboundedUDTFCall(logicalPlan)) {
   throw new ValidationException("Cannot translate a query with an 
unbounded table function call.")
 } else {
   logicalPlan.toRelNode(relBuilder)
 }
   ```
is being thrown mostly as a precaution, since previously there was no 
execution code to support it. Now (with this PR) that will not be the case 
anymore. What would happen if we simply removed it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support Table Function as Table for Stream Sql
> --
>
> Key: FLINK-8868
> URL: https://issues.apache.org/jira/browse/FLINK-8868
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>  Labels: pull-request-available
>
> for stream sql:
> support SQL like:  SELECT * FROM TABLE(tf("a"))
> for batch sql:
> udtf might produce infinite recors, it need to be discussed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6574: [FLINK-8868] [table] Support Table Function as Table Source for Stream Sql

2018-08-23 Thread GitBox
pnowojski commented on a change in pull request #6574: [FLINK-8868] [table] 
Support Table Function as Table Source for Stream Sql
URL: https://github.com/apache/flink/pull/6574#discussion_r212213682
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ##
 @@ -897,6 +897,45 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 assertEquals(List(expected.toString()), StreamITCase.testResults.sorted)
   }
+
+  @Test
 
 Review comment:
   What's the problem with Table API here? I had a suspicion that this 
`ValidationException`:
   ```
 def getRelNode: RelNode = if (containsUnboundedUDTFCall(logicalPlan)) {
   throw new ValidationException("Cannot translate a query with an 
unbounded table function call.")
 } else {
   logicalPlan.toRelNode(relBuilder)
 }
   ```
is being thrown mostly as a precaution, since previously there was no 
execution code to support it. Now (with this PR) that will not be the case 
anymore. What would happen if we simply removed it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589854#comment-16589854
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StefanRRichter commented on a change in pull request #6604: [FLINK-9061] 
Optionally add entropy to checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#discussion_r212211288
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -63,6 +78,7 @@ public FsCheckpointStorage(
this.sharedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_TASK_OWNED_STATE_DIR);
this.fileSizeThreshold = fileSizeThreshold;
+   this.pathFilter = checkNotNull(pathFilter);
 
 Review comment:
   Isn't this problematic here: if we have a concept that paths can contain an 
entropy key, we still call `mkdirs` on the "unfiltered" paths in the following 
lines. It seems to clash a bit with the storage location idea. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability

2018-08-23 Thread GitBox
StefanRRichter commented on a change in pull request #6604: [FLINK-9061] 
Optionally add entropy to checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#discussion_r212211288
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 ##
 @@ -63,6 +78,7 @@ public FsCheckpointStorage(
this.sharedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_TASK_OWNED_STATE_DIR);
this.fileSizeThreshold = fileSizeThreshold;
+   this.pathFilter = checkNotNull(pathFilter);
 
 Review comment:
   Isn't this problematic here: if we have a concept that paths can contain an 
entropy key, we still call `mkdirs` on the "unfiltered" paths in the following 
lines. It seems to clash a bit with the storage location idea. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


<    1   2