[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182016#comment-16182016
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/4729

[FLINK-7076] [ResourceManager] implement YARN stopWorker logic

## What is the purpose of the change

*Implement stopWorker logic for YarnResourceManager*


## Brief change log

  - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId 
mappings*
  - *Implement the stopWorker logic for YARN*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink implement-stopWorker-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4729


commit e82cb580956d3ee323e40f9c5335cf645b32b99d
Author: Shuyi Chen 
Date:   2017-09-27T05:23:34Z

implement YARN stopWorker logic




> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop...

2017-09-26 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/4729

[FLINK-7076] [ResourceManager] implement YARN stopWorker logic

## What is the purpose of the change

*Implement stopWorker logic for YarnResourceManager*


## Brief change log

  - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId 
mappings*
  - *Implement the stopWorker logic for YARN*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink implement-stopWorker-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4729


commit e82cb580956d3ee323e40f9c5335cf645b32b99d
Author: Shuyi Chen 
Date:   2017-09-27T05:23:34Z

implement YARN stopWorker logic




---


[jira] [Commented] (FLINK-7675) LatestCompletedCheckpointExternalPathGauge should check if external path is exist

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181968#comment-16181968
 ] 

ASF GitHub Bot commented on FLINK-7675:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4709
  
+1


> LatestCompletedCheckpointExternalPathGauge should check if external path is 
> exist
> -
>
> Key: FLINK-7675
> URL: https://issues.apache.org/jira/browse/FLINK-7675
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Blocker
>
> For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
> be {{null}}.
> This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
> {{null}} value to {{MetricDumpSerialization}}, then it will throw 
> {{NullPointerException}} in {{serializeGauge}} function.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4709: [FLINK-7675] [metrics] LatestCompletedCheckpointExternalP...

2017-09-26 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4709
  
+1


---


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141242109
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPathParameter.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Path parameter identifying jobs.
+ *
+ * The type of this parameter is {@link JobID}.
+ */
+public class JobPathParameter extends MessagePathParameter {
--- End diff --

I was actually going to create the same class for 
https://issues.apache.org/jira/browse/FLINK-7694, as described in that ticket :)

I prefer `JobIDPathParameter`


---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181958#comment-16181958
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141241955
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobCancellationHandler}.
+ */
+public class JobCancellationHeaders implements 
MessageHeaders {
+
+   private static final JobCancellationHeaders INSTANCE = new 
JobCancellationHeaders();
+
+   private JobCancellationHeaders() {}
+
+   @Override
+   public Class getRequestClass() {
+   return EmptyRequestBody.class;
+   }
+
+   @Override
+   public Class getResponseClass() {
+   return EmptyResponseBody.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.ACCEPTED;
+   }
+
+   @Override
+   public JobMessageParameters getUnresolvedMessageParameters() {
+   return new JobMessageParameters();
+   }
+
+   @Override
+   public HttpMethodWrapper getHttpMethod() {
+   return HttpMethodWrapper.DELETE;
+   }
+
+   @Override
+   public String getTargetRestEndpointURL() {
+   return "/jobs/:jobid";
--- End diff --

to avoid typo and easier code identification, please make this a 
public/private static final String


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181959#comment-16181959
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141241750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobNotFoundException.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is returned if a job could not be found.
+ */
+public class JobNotFoundException extends FlinkException {
--- End diff --

I prefer `FlinkJobNotFoundException`


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141241750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobNotFoundException.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is returned if a job could not be found.
+ */
+public class JobNotFoundException extends FlinkException {
--- End diff --

I prefer `FlinkJobNotFoundException`


---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181960#comment-16181960
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141241787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobNotFoundException.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is returned if a job could not be found.
+ */
+public class JobNotFoundException extends FlinkException {
+
+   private static final long serialVersionUID = -7803390762010615384L;
+
+   public JobNotFoundException(JobID jobId) {
+   super("Could not find job (" + jobId + ").");
--- End diff --

I feel "Could not find Flink job ..." is better


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181961#comment-16181961
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141242109
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPathParameter.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Path parameter identifying jobs.
+ *
+ * The type of this parameter is {@link JobID}.
+ */
+public class JobPathParameter extends MessagePathParameter {
--- End diff --

I was actually going to create the same class for 
https://issues.apache.org/jira/browse/FLINK-7694, as described in that ticket :)

I prefer `JobIDPathParameter`


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141241787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobNotFoundException.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is returned if a job could not be found.
+ */
+public class JobNotFoundException extends FlinkException {
+
+   private static final long serialVersionUID = -7803390762010615384L;
+
+   public JobNotFoundException(JobID jobId) {
+   super("Could not find job (" + jobId + ").");
--- End diff --

I feel "Could not find Flink job ..." is better


---


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141241955
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobCancellationHandler}.
+ */
+public class JobCancellationHeaders implements 
MessageHeaders {
+
+   private static final JobCancellationHeaders INSTANCE = new 
JobCancellationHeaders();
+
+   private JobCancellationHeaders() {}
+
+   @Override
+   public Class getRequestClass() {
+   return EmptyRequestBody.class;
+   }
+
+   @Override
+   public Class getResponseClass() {
+   return EmptyResponseBody.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.ACCEPTED;
+   }
+
+   @Override
+   public JobMessageParameters getUnresolvedMessageParameters() {
+   return new JobMessageParameters();
+   }
+
+   @Override
+   public HttpMethodWrapper getHttpMethod() {
+   return HttpMethodWrapper.DELETE;
+   }
+
+   @Override
+   public String getTargetRestEndpointURL() {
+   return "/jobs/:jobid";
--- End diff --

to avoid typo and easier code identification, please make this a 
public/private static final String


---


[jira] [Updated] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1

2017-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7642:
--
Description: 
Surefire 2.19 release introduced more useful test filters which would let us 
run a subset of the test.


This issue is for upgrading maven surefire plugin to 2.19.1

  was:
Surefire 2.19 release introduced more useful test filters which would let us 
run a subset of the test.

This issue is for upgrading maven surefire plugin to 2.19.1


> Upgrade maven surefire plugin to 2.19.1
> ---
>
> Key: FLINK-7642
> URL: https://issues.apache.org/jira/browse/FLINK-7642
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Surefire 2.19 release introduced more useful test filters which would let us 
> run a subset of the test.
> This issue is for upgrading maven surefire plugin to 2.19.1



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-09-26 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181953#comment-16181953
 ] 

Bowen Li commented on FLINK-7694:
-

[~till.rohrmann] Hi Till, I have done some research on how to accomplish this 
migration. To make sure I'm on the right path, I want to discuss with you my 
rough understanding and ask some questions about the code architecture.

Here're the steps I think I need to do:


1. add class 

{code:java}
public class JobIDPathParameter extends MessagePathParameter
{code}

2. add class

{code:java}
public class JobIDMessageParameters extends MessageParameters {
private final JobIDPathParameter jobIDPathParameter;
...
}
{code}

3. add class 

{code:java}
public class JobMetricsOverview implements ResponseBody {
public static final String FIELD_NAME_METRICS = "metrics";

@JsonProperty(FIELD_NAME_METRICS)
private final  Map metrics;

...
}
{code}


4. change {{JobMetricsHandler}} 's signature to 

{code:java}
public class JobMetricsHandler extends AbstractMetricsHandler implements 
LegacyRestHandler
{code}



Do the above steps make sense?

What I haven't figured out are: where is {{JobMetricsOverview}} (the 
ResponseBody) consumed? Is there an example of integration test?


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4704: [hotfix] [docs] Update function name in DataStream API

2017-09-26 Thread phiradet
Github user phiradet commented on the issue:

https://github.com/apache/flink/pull/4704
  
Thanks


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-26 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181847#comment-16181847
 ] 

Xingcan Cui commented on FLINK-7548:


Thanks for the explanation, [~fhueske]. I was too paranoid about the problem.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig

2017-09-26 Thread Raymond Tay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181812#comment-16181812
 ] 

Raymond Tay commented on FLINK-7669:


Checked but its still occurring, i'll investigate this a little bit more

> org.apache.flink.api.common.ExecutionConfig cannot be cast to 
> org.apache.flink.api.common.ExecutionConfig
> -
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
> submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:484)
>   at 
> 

[jira] [Commented] (FLINK-7549) CEP - Pattern not discovered if source streaming is very fast

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181718#comment-16181718
 ] 

Paolo Rendano commented on FLINK-7549:
--

mentioned in

> CEP - Pattern not discovered if source streaming is very fast
> -
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
>Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream dataStreamSource =
> env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(1)) {
> private static final long serialVersionUID = 
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper 
> element) {
> if 
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
> }
> return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern myPattern = Pattern
> .begin("start")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st", "none"))
> .next("end")
>   .subtype(MyMessageWrapper.class)
>   .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181713#comment-16181713
 ] 

Paolo Rendano edited comment on FLINK-7606 at 9/26/17 10:59 PM:


Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7549 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that +10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo



was (Author: i...@paolorendano.it):
Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7606 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that +10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo


> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of 

[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176654#comment-16176654
 ] 

Paolo Rendano edited comment on FLINK-7606 at 9/26/17 10:59 PM:


Hi [~kkl0u],
sorry this week was hard also for me. Next week for sure i'll have some time to 
check your suggestions and make the trial. Regarding my open issue FLINK-7549, 
I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next 
week.




was (Author: i...@paolorendano.it):
Hi [~kkl0u],
sorry this week was hard also for me. Next week for sure i'll have some time to 
check your suggestions and make the trial. Regarding my open issue FLINK-7606, 
I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next 
week.



> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181713#comment-16181713
 ] 

Paolo Rendano commented on FLINK-7606:
--

Hi [~kkl0u],
1) sure I have to set and I was not setting it in my test: 
{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}
I've double checked this and without the set I have a memory leak (as reported 
by [~matteoferrario29]). Looking at the memory after the test, it seems that 
used keys are disposed (the memory come back to the initial size after last 
GC). Example (after processing 100k keys 2msgs/key): 
[^Schermata 2017-09-27 alle 00.35.53.png]

2) I've done again my test related with issue FLINK-7606 adding more logs and 
checking again the result and now it seems that all the expected events are 
generated, but... the last chunk of events (maybe thousands) are not generated 
until I run again the test (even 1 more message is enough to trigger the 
generation of all the remaining events). It seems the minimum number is about 
5k input messages before it starts to flush out the buffer. So.. the question 
is: *can you explain the strategy to flush out the generation of events*? How 
to trigger it? Of course as it is now can block the generation of events until 
a new message is processed (maybe with a watermark that exceed that +10 sec).
Just one answer to your questions regarding my last test scenario: parallelism 
is 1, Idle precedes Start, Idle timestamp is set to x and Start timestamp is 
set to x+1sec, no delay set between messages, and during the test I see the 
watermark advancing. Since the generation of messages in jmeter is in a cycle 
and no delay between cycles, x+1sec of a cycle can be greater than x in the 
following cycle. This was set intentionally by me to verify reordering of 
events.

Thanks
Paolo


> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7606) CEP operator leaks state

2017-09-26 Thread Paolo Rendano (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano updated FLINK-7606:
-
Attachment: Schermata 2017-09-27 alle 00.35.53.png

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png, 
> Schermata 2017-09-27 alle 00.35.53.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-09-26 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181315#comment-16181315
 ] 

Bowen Li commented on FLINK-7648:
-

[~till.rohrmann] Hi Till, I built Flink from source, copied 
flink-runtime-web-xxx.jar to {{build/lib}} (so flink-runtime-web is on 
classpath), and ran {{bin/start-cluster.sh flip6}}, but I couldn't connect to 
http://localhost:8081 from my browser. Did I miss anything?

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7670) typo in docs runtime section

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181263#comment-16181263
 ] 

ASF GitHub Bot commented on FLINK-7670:
---

Github user keweishang commented on the issue:

https://github.com/apache/flink/pull/4706
  
@tzulitai thanks for merging the fix!


> typo in docs runtime section
> 
>
> Key: FLINK-7670
> URL: https://issues.apache.org/jira/browse/FLINK-7670
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Kewei SHANG
>Priority: Minor
> Fix For: 1.3.3
>
>
> The following link to Savepoints page
> [Savepoints](..//setup/savepoints.html)
> change to
> [Savepoints](../setup/savepoints.html)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7670) typo in docs runtime section

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181264#comment-16181264
 ] 

ASF GitHub Bot commented on FLINK-7670:
---

Github user keweishang closed the pull request at:

https://github.com/apache/flink/pull/4706


> typo in docs runtime section
> 
>
> Key: FLINK-7670
> URL: https://issues.apache.org/jira/browse/FLINK-7670
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Kewei SHANG
>Priority: Minor
> Fix For: 1.3.3
>
>
> The following link to Savepoints page
> [Savepoints](..//setup/savepoints.html)
> change to
> [Savepoints](../setup/savepoints.html)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4706: [FLINK-7670][doc] fix typo in docs runtime section

2017-09-26 Thread keweishang
Github user keweishang closed the pull request at:

https://github.com/apache/flink/pull/4706


---


[GitHub] flink issue #4706: [FLINK-7670][doc] fix typo in docs runtime section

2017-09-26 Thread keweishang
Github user keweishang commented on the issue:

https://github.com/apache/flink/pull/4706
  
@tzulitai thanks for merging the fix!


---


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181245#comment-16181245
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r141136758
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1946,4 +1974,44 @@ public File getInstanceBasePath() {
public boolean supportsAsynchronousSnapshots() {
return true;
}
+
+   private static class RocksIteratorWrapper implements Iterator {
+   private final RocksIterator iterator;
+   private final String field;
+   private final TypeSerializer keySerializer;
+   private final int keyGroupPrefixBytes;
+
+   public RocksIteratorWrapper(
+   RocksIterator iterator,
+   String field,
+   TypeSerializer keySerializer,
+   int keyGroupPrefixBytes) {
+   this.iterator = Preconditions.checkNotNull(iterator);
+   this.field = Preconditions.checkNotNull(field);
+   this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
+   this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   }
+
+   @Override
+   public boolean hasNext() {
+   return iterator.isValid();
+   }
+
+   @Override
+   public K next() {
+   if (!iterator.isValid()) {
--- End diff --

minor: I usually call `hasNext()` here, otherwise it'll duplicate logic

```
if(!hasNext()) {
  throw exception
}
```


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend

2017-09-26 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4722
  
LGTM


---


[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

2017-09-26 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181234#comment-16181234
 ] 

Fabian Hueske commented on FLINK-7548:
--

I think there might be a misunderstanding of the 
{{TimestampsAndPeriodicWatermarksOperator}}.

bq. About my last question, I actually refer to the 
TimestampsAndPeriodicWatermarksOperator. Here, the "periodic" refers to 
proctime. Considering the time systems for the rowtime and the proctime may not 
be synchronized (i.e., they get different speeds), could we consider providing 
a "rowtime periodic" assigner?

The assigner does not emit watermarks that have a processing time _value_ but 
only in regular time intervals that are based on processing time.
So, whenever the interval has passed, the operator asks the 
{{AssignerWithPeriodicWatermarks}} to return the current watermark. The 
watermark should based on the event-time timestamps that the 
{{AssignerWithPeriodicWatermarks}} observed (in fact this depends on the 
implementation of the assigner). So the value of the watermarks is based on 
event-time but the interval in which the watermarks are generated is not. 
AFAIK, this is the most commonly used watermark generation strategy. So it 
seems to suit many users.

> Support watermark generation for TableSource
> 
>
> Key: FLINK-7548
> URL: https://issues.apache.org/jira/browse/FLINK-7548
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7683) Add method to iterate over all of the existing keys in a statebackend

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181246#comment-16181246
 ] 

ASF GitHub Bot commented on FLINK-7683:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4722
  
LGTM


> Add method to iterate over all of the existing keys in a statebackend
> -
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

2017-09-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4722#discussion_r141136758
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1946,4 +1974,44 @@ public File getInstanceBasePath() {
public boolean supportsAsynchronousSnapshots() {
return true;
}
+
+   private static class RocksIteratorWrapper implements Iterator {
+   private final RocksIterator iterator;
+   private final String field;
+   private final TypeSerializer keySerializer;
+   private final int keyGroupPrefixBytes;
+
+   public RocksIteratorWrapper(
+   RocksIterator iterator,
+   String field,
+   TypeSerializer keySerializer,
+   int keyGroupPrefixBytes) {
+   this.iterator = Preconditions.checkNotNull(iterator);
+   this.field = Preconditions.checkNotNull(field);
+   this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
+   this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   }
+
+   @Override
+   public boolean hasNext() {
+   return iterator.isValid();
+   }
+
+   @Override
+   public K next() {
+   if (!iterator.isValid()) {
--- End diff --

minor: I usually call `hasNext()` here, otherwise it'll duplicate logic

```
if(!hasNext()) {
  throw exception
}
```


---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181224#comment-16181224
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4710
  
Thanks for the review @twalthr.
I've updated the PR. 

@haohui: This PR preserves the current logic that time attributes are 
exposed as `TIMESTAMP`. I agree that support for time indicators that expose 
themselves as `Long` is desirable. However, this requires quite a few changes 
as we need to extend several functions (incl. `TUMBLE`, `HOP`, etc.) and 
validation logic in some operators (over windows, joins, etc.). So this is not 
a lightweight change and should be done as a separate issue, IMO. 


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribute to wo...

2017-09-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4710
  
Thanks for the review @twalthr.
I've updated the PR. 

@haohui: This PR preserves the current logic that time attributes are 
exposed as `TIMESTAMP`. I agree that support for time indicators that expose 
themselves as `Long` is desirable. However, this requires quite a few changes 
as we need to extend several functions (incl. `TUMBLE`, `HOP`, etc.) and 
validation logic in some operators (over windows, joins, etc.). So this is not 
a lightweight change and should be done as a separate issue, IMO. 


---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181204#comment-16181204
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r141130569
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
 ---
@@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase {
   )
 util.verifyTable(t, expected)
   }
+
+  @Test
+  def testProjectableProcTimeTableSource(): Unit = {
+// ensures that projection is not pushed into table source with 
proctime indicators
+val util = streamTestUtil()
+
+val projectableTableSource = new TestProctimeSource("pTime") with 
ProjectableTableSource[Row] {
+  override def projectFields(fields: Array[Int]): TableSource[Row] = {
+// ensure this method is not called!
+Assert.fail()
+null.asInstanceOf[TableSource[Row]]
+  }
+}
+util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+
+val t = util.tableEnv.scan("PTimeTable")
+  .select('name, 'val)
+  .where('val > 10)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+"StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, 
name, pTime])",
+term("select", "name", "val"),
+term("where", ">(val, 10)")
+  )
+util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProjectableRowTimeTableSource(): Unit = {
--- End diff --

Yes, that should not be a problem. Projection push-down is not possible 
because the schema of the table is partially constructed inside of the Table 
API (e.g., appending the proctime attribute). 


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribut...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r141130569
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
 ---
@@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase {
   )
 util.verifyTable(t, expected)
   }
+
+  @Test
+  def testProjectableProcTimeTableSource(): Unit = {
+// ensures that projection is not pushed into table source with 
proctime indicators
+val util = streamTestUtil()
+
+val projectableTableSource = new TestProctimeSource("pTime") with 
ProjectableTableSource[Row] {
+  override def projectFields(fields: Array[Int]): TableSource[Row] = {
+// ensure this method is not called!
+Assert.fail()
+null.asInstanceOf[TableSource[Row]]
+  }
+}
+util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+
+val t = util.tableEnv.scan("PTimeTable")
+  .select('name, 'val)
+  .where('val > 10)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+"StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, 
name, pTime])",
+term("select", "name", "val"),
+term("where", ">(val, 10)")
+  )
+util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProjectableRowTimeTableSource(): Unit = {
--- End diff --

Yes, that should not be a problem. Projection push-down is not possible 
because the schema of the table is partially constructed inside of the Table 
API (e.g., appending the proctime attribute). 


---


[jira] [Created] (FLINK-7696) Add projection push-down support for TableSources with time attributes

2017-09-26 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7696:


 Summary: Add projection push-down support for TableSources with 
time attributes
 Key: FLINK-7696
 URL: https://issues.apache.org/jira/browse/FLINK-7696
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: Fabian Hueske


Table sources that implement the {{DefinedProctimeAttribute}} or 
{{DefinedRowtimeAttribute}} do not support projection push-down even if they 
also implement {{ProjectableTableSource}}. 

There are several problems:
- the schema of a {{TableSource}} that implements {{DefinedRowtimeAttribute}} 
or {{DefinedProctimeAttribute}} is constructed in the catalog not in the 
{{TableSource}} (proctime fields are always appended at the end).
- the {{ProjectableTableSource.projectFields()}} method returns the projected 
fields as int indicies. In order to handle the indicies correctly, the 
TableSource would need to know about the internals of the Table API.
- {{ProjectableTableSource.projectFields()}} might reorder fields and move a 
proctime field into the middle of the schema. However, the TableSource has no 
control over that.
- A {{TableSource}} that implements {{DefinedRowtimeAttribute}} or 
{{DefinedProctimeAttribute}} would need to change the return values of 
{{getRowtimeAttribute()}} or {{getProctimeAttribute()}} depending on whether 
the attribute is kept or not.

Adjusting the schema of table sources inside of the Table API makes all of this 
quite messy. Maybe we need to refine the interfaces. For instance, we could ask 
users to explicitly add time indicator fields in the {{TypeInformation}} 
returned by {{TableSource.getReturnType()}}. However, that might collide with 
plans to add computable time attributes as proposed in FLINK-7548.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181183#comment-16181183
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141123168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -227,5 +229,19 @@ void failSlot(final ResourceID taskManagerId,
 */
void heartbeatFromResourceManager(final ResourceID resourceID);
 
+   /**
+* Request the details of the executed job.
+*
+* @param timeout for the rpc call
+* @return Future details of the executed job
+*/
CompletableFuture requestJobDetails(@RpcTimeout Time 
timeout);
--- End diff --

are we still keeping this for the job master?


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141123168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -227,5 +229,19 @@ void failSlot(final ResourceID taskManagerId,
 */
void heartbeatFromResourceManager(final ResourceID resourceID);
 
+   /**
+* Request the details of the executed job.
+*
+* @param timeout for the rpc call
+* @return Future details of the executed job
+*/
CompletableFuture requestJobDetails(@RpcTimeout Time 
timeout);
--- End diff --

are we still keeping this for the job master?


---


[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141125073
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -38,5 +44,36 @@
 * @param timeout for this operation
 * @return Future REST endpoint address
 */
-   CompletableFuture requestRestAddress(Time timeout);
+   CompletableFuture requestRestAddress(@RpcTimeout  Time timeout);
+
+   /**
+* Requests the AccessExecutionGraph for the given jobId. If there is 
no such graph, then
+* the future is completed with a {@link JobNotFoundException}.
+*
+* @param jobId identifying the job whose AccessExecutionGraph is 
requested
+* @param timeout for the asynchronous operation
+* @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link JobNotFoundException}
+*/
+   CompletableFuture requestJob(JobID jobId, 
@RpcTimeout Time timeout);
--- End diff --

Well this seems questionable. These methods technically aren't related to 
being a restful gateway,

This seems like an odd attempt to reduce code duplication by just moving 
the code one layer up without other considerations.


---


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181182#comment-16181182
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141125073
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -38,5 +44,36 @@
 * @param timeout for this operation
 * @return Future REST endpoint address
 */
-   CompletableFuture requestRestAddress(Time timeout);
+   CompletableFuture requestRestAddress(@RpcTimeout  Time timeout);
+
+   /**
+* Requests the AccessExecutionGraph for the given jobId. If there is 
no such graph, then
+* the future is completed with a {@link JobNotFoundException}.
+*
+* @param jobId identifying the job whose AccessExecutionGraph is 
requested
+* @param timeout for the asynchronous operation
+* @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link JobNotFoundException}
+*/
+   CompletableFuture requestJob(JobID jobId, 
@RpcTimeout Time timeout);
--- End diff --

Well this seems questionable. These methods technically aren't related to 
being a restful gateway,

This seems like an odd attempt to reduce code duplication by just moving 
the code one layer up without other considerations.


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181181#comment-16181181
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141122442
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -228,55 +245,55 @@ public WebRuntimeMonitor(
} else {
serverSSLContext = null;
}
-   metricFetcher = new MetricFetcher(retriever, 
queryServiceRetriever, executor, timeout);
+   metricFetcher = new MetricFetcher(retriever, 
queryServiceRetriever, scheduledExecutor, timeout);
 
String defaultSavepointDir = 
config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
-   JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(currentGraphs, executor, 
defaultSavepointDir);
+   JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, 
defaultSavepointDir);
RuntimeMonitorHandler triggerHandler = 
handler(cancelWithSavepoint.getTriggerHandler());
RuntimeMonitorHandler inProgressHandler = 
handler(cancelWithSavepoint.getInProgressHandler());
 
Router router = new Router();
// config how to interact with this web server
-   get(router, new DashboardConfigHandler(executor, 
cfg.getRefreshInterval()));
+   get(router, new DashboardConfigHandler(scheduledExecutor, 
cfg.getRefreshInterval()));
--- End diff --

well i see an easy way to reduce the diff...


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4728#discussion_r141122442
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -228,55 +245,55 @@ public WebRuntimeMonitor(
} else {
serverSSLContext = null;
}
-   metricFetcher = new MetricFetcher(retriever, 
queryServiceRetriever, executor, timeout);
+   metricFetcher = new MetricFetcher(retriever, 
queryServiceRetriever, scheduledExecutor, timeout);
 
String defaultSavepointDir = 
config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
-   JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(currentGraphs, executor, 
defaultSavepointDir);
+   JobCancellationWithSavepointHandlers cancelWithSavepoint = new 
JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, 
defaultSavepointDir);
RuntimeMonitorHandler triggerHandler = 
handler(cancelWithSavepoint.getTriggerHandler());
RuntimeMonitorHandler inProgressHandler = 
handler(cancelWithSavepoint.getInProgressHandler());
 
Router router = new Router();
// config how to interact with this web server
-   get(router, new DashboardConfigHandler(executor, 
cfg.getRefreshInterval()));
+   get(router, new DashboardConfigHandler(scheduledExecutor, 
cfg.getRefreshInterval()));
--- End diff --

well i see an easy way to reduce the diff...


---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181172#comment-16181172
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r141124496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 ---
@@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends 
RelOptRule(
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
 scan.tableSource match {
+  // projection pushdown is not supported for sources that provide 
time indicators
+  case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => 
false
--- End diff --

Yes will do, but it is not easy to solve IMO.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4710: [FLINK-7446] [table] Change DefinedRowtimeAttribut...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r141124496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 ---
@@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends 
RelOptRule(
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
 scan.tableSource match {
+  // projection pushdown is not supported for sources that provide 
time indicators
+  case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => 
false
--- End diff --

Yes will do, but it is not easy to solve IMO.


---


[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181158#comment-16181158
 ] 

ASF GitHub Bot commented on FLINK-7649:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4700#discussion_r141121095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -26,9 +26,10 @@
  *
  * A job related REST handler always requires a {@link 
JobPathParameter}.
  */
-public class JobMessageParameters extends MessageParameters {
+public class JobTerminationMessageParameters extends MessageParameters {
--- End diff --

did you intentionally modify an existing class? Will this class not be used 
by several other handler sin the future as well?


> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobStoppingHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4700: [FLINK-7649] [flip6] Extend JobTerminationHandler ...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4700#discussion_r141121095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -26,9 +26,10 @@
  *
  * A job related REST handler always requires a {@link 
JobPathParameter}.
  */
-public class JobMessageParameters extends MessageParameters {
+public class JobTerminationMessageParameters extends MessageParameters {
--- End diff --

did you intentionally modify an existing class? Will this class not be used 
by several other handler sin the future as well?


---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181151#comment-16181151
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141120508
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobCancellationHandler}.
+ */
+public class JobCancellationHeaders implements 
MessageHeaders {
+
+   private static final JobCancellationHeaders INSTANCE = new 
JobCancellationHeaders();
+
+   private JobCancellationHeaders() {}
+
+   @Override
+   public Class getRequestClass() {
+   return EmptyRequestBody.class;
+   }
+
+   @Override
+   public Class getResponseClass() {
+   return EmptyResponseBody.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.ACCEPTED;
+   }
+
+   @Override
+   public JobMessageParameters getUnresolvedMessageParameters() {
+   return new JobMessageParameters();
+   }
+
+   @Override
+   public HttpMethodWrapper getHttpMethod() {
+   return HttpMethodWrapper.DELETE;
--- End diff --

I still think this should not be a DELETE since we can still access 
information about that job under the URL that we just issues the delete call 
to, even after the delete has finished.


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4697: [FLINK-7650] [flip6] Port JobCancellationHandler t...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r141120508
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobCancellationHandler}.
+ */
+public class JobCancellationHeaders implements 
MessageHeaders {
+
+   private static final JobCancellationHeaders INSTANCE = new 
JobCancellationHeaders();
+
+   private JobCancellationHeaders() {}
+
+   @Override
+   public Class getRequestClass() {
+   return EmptyRequestBody.class;
+   }
+
+   @Override
+   public Class getResponseClass() {
+   return EmptyResponseBody.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.ACCEPTED;
+   }
+
+   @Override
+   public JobMessageParameters getUnresolvedMessageParameters() {
+   return new JobMessageParameters();
+   }
+
+   @Override
+   public HttpMethodWrapper getHttpMethod() {
+   return HttpMethodWrapper.DELETE;
--- End diff --

I still think this should not be a DELETE since we can still access 
information about that job under the URL that we just issues the delete call 
to, even after the delete has finished.


---


[jira] [Commented] (FLINK-7663) Allow AbstractRestHandler to handle bad requests

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181143#comment-16181143
 ] 

ASF GitHub Bot commented on FLINK-7663:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141119299
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -237,8 +300,8 @@ public TestParameters getUnresolvedMessageParameters() {
}
 
private static class TestParameters extends MessageParameters {
-   private final JobIDPathParameter jobIDPathParameter = new 
JobIDPathParameter();
-   private final JobIDQueryParameter jobIDQueryParameter = new 
JobIDQueryParameter();
+   protected final JobIDPathParameter jobIDPathParameter = new 
JobIDPathParameter();
--- End diff --

what is this change for?


> Allow AbstractRestHandler to handle bad requests
> 
>
> Key: FLINK-7663
> URL: https://issues.apache.org/jira/browse/FLINK-7663
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} parses the request and tries to generate a 
> {{HandlerRequest}}. If this fails, then the server answers with an internal 
> server error. Instead we should allow the {{AbstractRestHandler}} to be able 
> to return a BAD_REQUEST status code. In order to do that, I would like to 
> introduce a {{HandlerRequestException}} which can be thrown while creating 
> the {{HandlerRequest}}. If this exception is thrown, then we return an error 
> message with {{BAD_REQUEST}} {{HttpResponseStatus}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7663) Allow AbstractRestHandler to handle bad requests

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181145#comment-16181145
 ] 

ASF GitHub Bot commented on FLINK-7663:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141119740
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -162,8 +225,8 @@ public void testEndpoints() throws Exception {
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, 
RestfulGateway gateway) throws RestHandlerException {
-   
Assert.assertEquals(request.getPathParameter(JobIDPathParameter.class), 
PATH_JOB_ID);
-   
Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0),
 QUERY_JOB_ID);
+   
assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
--- End diff --

How about we stick to the import pattern that already exists in a given 
file instead of switching back and forth?


> Allow AbstractRestHandler to handle bad requests
> 
>
> Key: FLINK-7663
> URL: https://issues.apache.org/jira/browse/FLINK-7663
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} parses the request and tries to generate a 
> {{HandlerRequest}}. If this fails, then the server answers with an internal 
> server error. Instead we should allow the {{AbstractRestHandler}} to be able 
> to return a BAD_REQUEST status code. In order to do that, I would like to 
> introduce a {{HandlerRequestException}} which can be thrown while creating 
> the {{HandlerRequest}}. If this exception is thrown, then we return an error 
> message with {{BAD_REQUEST}} {{HttpResponseStatus}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7663) Allow AbstractRestHandler to handle bad requests

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181144#comment-16181144
 ] 

ASF GitHub Bot commented on FLINK-7663:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141118884
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -86,46 +98,97 @@ public void testEndpoints() throws Exception {
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT);
 
-   RestServerEndpoint serverEndpoint = new 
TestRestServerEndpoint(serverConfig, testHandler);
-   RestClient clientEndpoint = new TestRestClient(clientConfig);
+   serverEndpoint = new TestRestServerEndpoint(serverConfig, 
testHandler);
+   clientEndpoint = new TestRestClient(clientConfig);
 
-   try {
-   serverEndpoint.start();
-
-   TestParameters parameters = new TestParameters();
-   parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-   
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-   // send first request and wait until the handler blocks
-   CompletableFuture response1;
-   synchronized (TestHandler.LOCK) {
-   response1 = clientEndpoint.sendRequest(
-   serverConfig.getEndpointBindAddress(),
-   serverConfig.getEndpointBindPort(),
-   new TestHeaders(),
-   parameters,
-   new TestRequest(1));
-   TestHandler.LOCK.wait();
-   }
+   serverEndpoint.start();
+   }
+
+   @After
+   public void teardown() {
+   if (clientEndpoint != null) {
+   clientEndpoint.shutdown(timeout);
+   clientEndpoint = null;
+   }
 
-   // send second request and verify response
-   CompletableFuture response2 = 
clientEndpoint.sendRequest(
-   serverConfig.getEndpointBindAddress(),
-   serverConfig.getEndpointBindPort(),
+   if (serverEndpoint != null) {
+   serverEndpoint.shutdown(timeout);
+   serverEndpoint = null;
+   }
+   }
+
+   @Test
+   public void testEndpoints() throws Exception {
--- End diff --

rename to "testRequestInterleaving", since we modify the line anyway


> Allow AbstractRestHandler to handle bad requests
> 
>
> Key: FLINK-7663
> URL: https://issues.apache.org/jira/browse/FLINK-7663
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} parses the request and tries to generate a 
> {{HandlerRequest}}. If this fails, then the server answers with an internal 
> server error. Instead we should allow the {{AbstractRestHandler}} to be able 
> to return a BAD_REQUEST status code. In order to do that, I would like to 
> introduce a {{HandlerRequestException}} which can be thrown while creating 
> the {{HandlerRequest}}. If this exception is thrown, then we return an error 
> message with {{BAD_REQUEST}} {{HttpResponseStatus}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4699: [FLINK-7663] [flip6] Return BAD_REQUEST if Handler...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141119299
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -237,8 +300,8 @@ public TestParameters getUnresolvedMessageParameters() {
}
 
private static class TestParameters extends MessageParameters {
-   private final JobIDPathParameter jobIDPathParameter = new 
JobIDPathParameter();
-   private final JobIDQueryParameter jobIDQueryParameter = new 
JobIDQueryParameter();
+   protected final JobIDPathParameter jobIDPathParameter = new 
JobIDPathParameter();
--- End diff --

what is this change for?


---


[GitHub] flink pull request #4699: [FLINK-7663] [flip6] Return BAD_REQUEST if Handler...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141118884
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -86,46 +98,97 @@ public void testEndpoints() throws Exception {
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT);
 
-   RestServerEndpoint serverEndpoint = new 
TestRestServerEndpoint(serverConfig, testHandler);
-   RestClient clientEndpoint = new TestRestClient(clientConfig);
+   serverEndpoint = new TestRestServerEndpoint(serverConfig, 
testHandler);
+   clientEndpoint = new TestRestClient(clientConfig);
 
-   try {
-   serverEndpoint.start();
-
-   TestParameters parameters = new TestParameters();
-   parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-   
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-   // send first request and wait until the handler blocks
-   CompletableFuture response1;
-   synchronized (TestHandler.LOCK) {
-   response1 = clientEndpoint.sendRequest(
-   serverConfig.getEndpointBindAddress(),
-   serverConfig.getEndpointBindPort(),
-   new TestHeaders(),
-   parameters,
-   new TestRequest(1));
-   TestHandler.LOCK.wait();
-   }
+   serverEndpoint.start();
+   }
+
+   @After
+   public void teardown() {
+   if (clientEndpoint != null) {
+   clientEndpoint.shutdown(timeout);
+   clientEndpoint = null;
+   }
 
-   // send second request and verify response
-   CompletableFuture response2 = 
clientEndpoint.sendRequest(
-   serverConfig.getEndpointBindAddress(),
-   serverConfig.getEndpointBindPort(),
+   if (serverEndpoint != null) {
+   serverEndpoint.shutdown(timeout);
+   serverEndpoint = null;
+   }
+   }
+
+   @Test
+   public void testEndpoints() throws Exception {
--- End diff --

rename to "testRequestInterleaving", since we modify the line anyway


---


[GitHub] flink pull request #4699: [FLINK-7663] [flip6] Return BAD_REQUEST if Handler...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141119740
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
@@ -162,8 +225,8 @@ public void testEndpoints() throws Exception {
 
@Override
protected CompletableFuture 
handleRequest(@Nonnull HandlerRequest request, 
RestfulGateway gateway) throws RestHandlerException {
-   
Assert.assertEquals(request.getPathParameter(JobIDPathParameter.class), 
PATH_JOB_ID);
-   
Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0),
 QUERY_JOB_ID);
+   
assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
--- End diff --

How about we stick to the import pattern that already exists in a given 
file instead of switching back and forth?


---


[jira] [Commented] (FLINK-7663) Allow AbstractRestHandler to handle bad requests

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181122#comment-16181122
 ] 

ASF GitHub Bot commented on FLINK-7663:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141115054
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -117,11 +117,26 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, Routed routed, T
}
}
 
+   final HandlerRequest handlerRequest;
+
+   try {
+   handlerRequest = new HandlerRequest<>(request, 
messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), 
routed.queryParams());
+   } catch (HandlerRequestException hre) {
+   log.error("Could not create the handler 
request.", hre);
+
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody(String.format("Could not create the handler request: %s", 
hre.getMessage())),
--- End diff --

HandlerRequests is an external concept that we imo should not expose. I 
would just say "Bad request, could not parse parameters."


> Allow AbstractRestHandler to handle bad requests
> 
>
> Key: FLINK-7663
> URL: https://issues.apache.org/jira/browse/FLINK-7663
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} parses the request and tries to generate a 
> {{HandlerRequest}}. If this fails, then the server answers with an internal 
> server error. Instead we should allow the {{AbstractRestHandler}} to be able 
> to return a BAD_REQUEST status code. In order to do that, I would like to 
> introduce a {{HandlerRequestException}} which can be thrown while creating 
> the {{HandlerRequest}}. If this exception is thrown, then we return an error 
> message with {{BAD_REQUEST}} {{HttpResponseStatus}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4699: [FLINK-7663] [flip6] Return BAD_REQUEST if Handler...

2017-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4699#discussion_r141115054
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -117,11 +117,26 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, Routed routed, T
}
}
 
+   final HandlerRequest handlerRequest;
+
+   try {
+   handlerRequest = new HandlerRequest<>(request, 
messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), 
routed.queryParams());
+   } catch (HandlerRequestException hre) {
+   log.error("Could not create the handler 
request.", hre);
+
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody(String.format("Could not create the handler request: %s", 
hre.getMessage())),
--- End diff --

HandlerRequests is an external concept that we imo should not expose. I 
would just say "Bad request, could not parse parameters."


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181117#comment-16181117
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141105482
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
--- End diff --

If yes, we would need to check if `MapView` supports `null` keys. If not we 
could wrap the key in a Row of arity 1 because Row supports null serialization.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181113#comment-16181113
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141103652
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Does SQL Multiset also support `null` values? If yes, we would need to wrap 
the `MapSerializer`.
Otherwise, the problem would be that we would need to rely on the key 
serializer to support `null` which many serializers do not. An solution would 
be to wrap the `MapSerializer` and additionally serialize the count for `null` 
elements.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181110#comment-16181110
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141112340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
+  if (other.getKind == SqlKind.COLLECT) {
+aggregates(index) = sqlTypeName match {
+  case TINYINT =>
+new ByteCollectAggFunction
+  case SMALLINT =>
+new ShortCollectAggFunction
+  case INTEGER =>
+new IntCollectAggFunction
+  case BIGINT =>
+new LongCollectAggFunction
+  case VARCHAR | CHAR =>
+new StringCollectAggFunction
+  case FLOAT =>
+new FloatCollectAggFunction
+  case DOUBLE =>
+new DoubleCollectAggFunction
+  case _ =>
+new ObjectCollectAggFunction
+}
+  } else {
--- End diff --

else case can be removed because we keep the catch all.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181114#comment-16181114
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r14367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
--- End diff --

Since we add a dedicated case for `COLLECT`, this case should not be remain 
at the end of this `match`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181116#comment-16181116
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r14022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
--- End diff --

Change this case to `case collect: SqlAggFunction if collect.getKind == 
SqlKind.COLLECT =>` to have a dedicated case for this built-in function. Also 
the case after `case _: SqlCountAggFunction` to have all built-in functions 
together.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181101#comment-16181101
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096269
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181112#comment-16181112
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141113194
  
--- Diff: docs/dev/table/sql.md ---
@@ -2107,6 +2108,17 @@ VAR_SAMP(value)
 Returns the sample variance (square of the sample standard 
deviation) of the numeric field across all input values.
   
 
+
+
+  
+  {% highlight text %}
+  COLLECT(value)
+  {% endhighlight %}
+  
+  
+  Returns a multiset of the values.
--- End diff --

Be more specific about the handling of `null` values. Are they ignored? 
What is returned if only null values are added (`null` or empty multiset)?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181105#comment-16181105
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
+   return (Class>)(Class)Map.class;
+   }
+
+   @Override
+   public boolean isKeyType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181115#comment-16181115
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141106084
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
+accumulator.f0.put(value, accumulator.f0.get(value) + 1)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+val iterator = accumulator.f0.iterator
+if (iterator.hasNext) {
+  val map = new util.HashMap[E, Integer]()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+map.put(entry.getKey, entry.getValue)
+  }
+  map
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
--- End diff --

According to the specs of `COLLECT`, is null the correct return value or an 
empty Multiset?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181109#comment-16181109
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141102418
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
--- End diff --

I'm not familiar with the specs of the `Collect` function. Should it also 
support `null` values?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181103#comment-16181103
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141104254
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
--- End diff --

`contains` and `get` issue result in two map look-ups.
It is more efficient to just do `get` and check for `null`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181108#comment-16181108
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141100624
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
--- End diff --

Usually Pojos don't need to implement `equals`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181100#comment-16181100
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141093589
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
--- End diff --

rm newline


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141100624
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
--- End diff --

Usually Pojos don't need to implement `equals`


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181098#comment-16181098
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096297
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181107#comment-16181107
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096324
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141095159
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Add to `org.apache.flink.table.api.Types` class for easy creation of 
`TypeInformation`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141103652
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Does SQL Multiset also support `null` values? If yes, we would need to wrap 
the `MapSerializer`.
Otherwise, the problem would be that we would need to rely on the key 
serializer to support `null` which many serializers do not. An solution would 
be to wrap the `MapSerializer` and additionally serialize the count for `null` 
elements.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141095159
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Add to `org.apache.flink.table.api.Types` class for easy creation of 
`TypeInformation`


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096324
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141104254
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
--- End diff --

`contains` and `get` issue result in two map look-ups.
It is more efficient to just do `get` and check for `null`


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141106084
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
+accumulator.f0.put(value, accumulator.f0.get(value) + 1)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+val iterator = accumulator.f0.iterator
+if (iterator.hasNext) {
+  val map = new util.HashMap[E, Integer]()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+map.put(entry.getKey, entry.getValue)
+  }
+  map
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
--- End diff --

According to the specs of `COLLECT`, is null the correct return value or an 
empty Multiset?


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181102#comment-16181102
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141099803
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
--- End diff --

add space `var f0: MapView`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181099#comment-16181099
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141093104
  
--- Diff: docs/dev/table/sql.md ---
@@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and 
DataStream APIs. Internal
 | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]`  
 |
 | `Types.OBJECT_ARRAY`   | `ARRAY` | e.g. 
`java.lang.Byte[]`|
 | `Types.MAP`| `MAP`   | 
`java.util.HashMap`|
+| `Types.MULTISET`   | `MULTISET`  | 
`java.util.HashMap`|
--- End diff --

should we explain how the `HashMap` is used to represent the multiset, 
i.e., that a multiset of `String` is a `HashMap`?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181104#comment-16181104
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097202
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
--- End diff --

The implementation of `getTotalFields()` of `MapTypeInfo` (which returns 
`2`) is not correct.
Can you move our implementation to `MapTypeInfo`? Then we don't need to 
override it here.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181106#comment-16181106
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097263
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141093589
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
--- End diff --

rm newline


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r14367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
--- End diff --

Since we add a dedicated case for `COLLECT`, this case should not be remain 
at the end of this `match`.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096269
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141105482
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
--- End diff --

If yes, we would need to check if `MapView` supports `null` keys. If not we 
could wrap the key in a Row of arity 1 because Row supports null serialization.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096297
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097263
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141102418
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
--- End diff --

I'm not familiar with the specs of the `Collect` function. Should it also 
support `null` values?


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141099803
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
--- End diff --

add space `var f0: MapView`


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141093104
  
--- Diff: docs/dev/table/sql.md ---
@@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and 
DataStream APIs. Internal
 | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]`  
 |
 | `Types.OBJECT_ARRAY`   | `ARRAY` | e.g. 
`java.lang.Byte[]`|
 | `Types.MAP`| `MAP`   | 
`java.util.HashMap`|
+| `Types.MULTISET`   | `MULTISET`  | 
`java.util.HashMap`|
--- End diff --

should we explain how the `HashMap` is used to represent the multiset, 
i.e., that a multiset of `String` is a `HashMap`?


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r14022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
--- End diff --

Change this case to `case collect: SqlAggFunction if collect.getKind == 
SqlKind.COLLECT =>` to have a dedicated case for this built-in function. Also 
the case after `case _: SqlCountAggFunction` to have all built-in functions 
together.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141113194
  
--- Diff: docs/dev/table/sql.md ---
@@ -2107,6 +2108,17 @@ VAR_SAMP(value)
 Returns the sample variance (square of the sample standard 
deviation) of the numeric field across all input values.
   
 
+
+
+  
+  {% highlight text %}
+  COLLECT(value)
+  {% endhighlight %}
+  
+  
+  Returns a multiset of the values.
--- End diff --

Be more specific about the handling of `null` values. Are they ignored? 
What is returned if only null values are added (`null` or empty multiset)?


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
+   return (Class>)(Class)Map.class;
+   }
+
+   @Override
+   public boolean isKeyType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097202
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
--- End diff --

The implementation of `getTotalFields()` of `MapTypeInfo` (which returns 
`2`) is not correct.
Can you move our implementation to `MapTypeInfo`? Then we don't need to 
override it here.


---


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141112340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
+  if (other.getKind == SqlKind.COLLECT) {
+aggregates(index) = sqlTypeName match {
+  case TINYINT =>
+new ByteCollectAggFunction
+  case SMALLINT =>
+new ShortCollectAggFunction
+  case INTEGER =>
+new IntCollectAggFunction
+  case BIGINT =>
+new LongCollectAggFunction
+  case VARCHAR | CHAR =>
+new StringCollectAggFunction
+  case FLOAT =>
+new FloatCollectAggFunction
+  case DOUBLE =>
+new DoubleCollectAggFunction
+  case _ =>
+new ObjectCollectAggFunction
+}
+  } else {
--- End diff --

else case can be removed because we keep the catch all.


---


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181093#comment-16181093
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r14717
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPathParameter.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Path parameter identifying jobs.
+ *
+ * The type of this parameter is {@link JobID}.
--- End diff --

I think we can remove this.


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181092#comment-16181092
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r14201
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -55,6 +55,14 @@
 public interface JobMasterGateway extends CheckpointCoordinatorGateway, 
FencedRpcGateway {
 
/**
+* Cancel the currently executed job.
--- End diff --

it would be more consistent with the javadoc of other method to say 
"Cancels"


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7650) Port JobCancellationHandler to new REST endpoint

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181095#comment-16181095
 ] 

ASF GitHub Bot commented on FLINK-7650:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4697#discussion_r14338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -55,6 +55,14 @@
 public interface JobMasterGateway extends CheckpointCoordinatorGateway, 
FencedRpcGateway {
 
/**
+* Cancel the currently executed job.
+*
+* @param timeout of this operation
+* @return Future acknowledge if the cancellation was successful
--- End diff --

what is returned if the cancellation was not successful?


> Port JobCancellationHandler to new REST endpoint
> 
>
> Key: FLINK-7650
> URL: https://issues.apache.org/jira/browse/FLINK-7650
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobCancellationHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >