[jira] [Commented] (FLINK-6392) Change the alias of Window from optional to essential.

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986023#comment-15986023
 ] 

ASF GitHub Bot commented on FLINK-6392:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3786

[FLINK-6392][table] Change the alias of Window from optional to essen…

In this PR refactoring the API definition using TYPE SYSTEM lead to 
constraint for the alias.
- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6392][table] Change the alias of Window from optional to essential.")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6392-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3786.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3786


commit a9a2c0e8473da287cb5f77ba48ed9d5048e6a3a7
Author: sunjincheng121 
Date:   2017-04-27T01:58:18Z

[FLINK-6392][table] Change the alias of Window from optional to essential.




> Change the alias of Window from optional to essential.
> --
>
> Key: FLINK-6392
> URL: https://issues.apache.org/jira/browse/FLINK-6392
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> Currently, The window clause use case looks like as following:
> {code}
> tab //Table('a,'b,'c)
>.window( Slide over 10.milli every 5.milli  as 'w) 
>.groupBy('w,'a,'b) 
>.select('a, 'b, 'c.sum, 'w.start, 'w.end)
> {code}
> As we see the alias of window is essential. But the current implementation of 
> the TableAPI does not have the constraint for the alias,So we must 
> refactoring the API definition using TYPE SYSTEM lead to constraint for the 
> alias.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3786: [FLINK-6392][table] Change the alias of Window fro...

2017-04-26 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3786

[FLINK-6392][table] Change the alias of Window from optional to essen…

In this PR refactoring the API definition using TYPE SYSTEM lead to 
constraint for the alias.
- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6392][table] Change the alias of Window from optional to essential.")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6392-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3786.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3786


commit a9a2c0e8473da287cb5f77ba48ed9d5048e6a3a7
Author: sunjincheng121 
Date:   2017-04-27T01:58:18Z

[FLINK-6392][table] Change the alias of Window from optional to essential.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985992#comment-15985992
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3777
  
> I can see this flooding the logs like crazy, especially with things like 
metrics and watermarks that update often. Some kind of filtering is probably 
necessary here.

You are right. The crazy log can separated into a new log file.

> This seems to be about auditing, so completely different...

The main purpose is record every access, like auditing log.


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log

2017-04-26 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3777
  
> I can see this flooding the logs like crazy, especially with things like 
metrics and watermarks that update often. Some kind of filtering is probably 
necessary here.

You are right. The crazy log can separated into a new log file.

> This seems to be about auditing, so completely different...

The main purpose is record every access, like auditing log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985934#comment-15985934
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora For the method `int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition 
num of target topic can be used. But the KafkaPartitioner's partition id array 
has been initialized in `void open(int parallelInstanceId, int 
parallelInstances, int[] partitions)`, which will be executed once, so yes, the 
problem for dynamic new topics when user uses older KafkaPartitioner API in 
their older job will still exist, and I find it hard to solve this problem 
completely.

What do you think of this? @tzulitai 


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora For the method `int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition 
num of target topic can be used. But the KafkaPartitioner's partition id array 
has been initialized in `void open(int parallelInstanceId, int 
parallelInstances, int[] partitions)`, which will be executed once, so yes, the 
problem for dynamic new topics when user uses older KafkaPartitioner API in 
their older job will still exist, and I find it hard to solve this problem 
completely.

What do you think of this? @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3785: [FLINK-6337][network] Remove the buffer provider f...

2017-04-26 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/3785

[FLINK-6337][network] Remove the buffer provider from 
PartitionRequestServerHandler

Currently, `PartitionRequestServerHandler` will create a `LocalBufferPool` 
when the channel is registered. The `LocalBufferPool` is only used to get 
segment size for creating read view in `SpillableSubpartition`, and the buffers 
in the pool will not be used all the time, so it will waste the buffer resource 
of global pool.

We would like to remove the `LocalBufferPool` from the 
`PartitionRequestServerHandler`, and the `LocalBufferPool` in `ResultPartition` 
can also provide the segment size for creating sub partition view.

This modification will not effect the current behavior and will get 
benefits of saving buffer resources.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-6337

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3785.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3785


commit 306f32414eabce5522ec2a8883aa27285e6aa3e4
Author: Zhijiang 
Date:   2017-04-26T08:18:54Z

[FLINK-6337][network] Remove the buffer provider from 
PartitionRequestServerHandler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6337) Remove the buffer provider from PartitionRequestServerHandler

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985901#comment-15985901
 ] 

ASF GitHub Bot commented on FLINK-6337:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/3785

[FLINK-6337][network] Remove the buffer provider from 
PartitionRequestServerHandler

Currently, `PartitionRequestServerHandler` will create a `LocalBufferPool` 
when the channel is registered. The `LocalBufferPool` is only used to get 
segment size for creating read view in `SpillableSubpartition`, and the buffers 
in the pool will not be used all the time, so it will waste the buffer resource 
of global pool.

We would like to remove the `LocalBufferPool` from the 
`PartitionRequestServerHandler`, and the `LocalBufferPool` in `ResultPartition` 
can also provide the segment size for creating sub partition view.

This modification will not effect the current behavior and will get 
benefits of saving buffer resources.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-6337

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3785.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3785


commit 306f32414eabce5522ec2a8883aa27285e6aa3e4
Author: Zhijiang 
Date:   2017-04-26T08:18:54Z

[FLINK-6337][network] Remove the buffer provider from 
PartitionRequestServerHandler




> Remove the buffer provider from PartitionRequestServerHandler
> -
>
> Key: FLINK-6337
> URL: https://issues.apache.org/jira/browse/FLINK-6337
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently, {{PartitionRequestServerHandler}} will create a 
> {{LocalBufferPool}} when the channel is registered. The {{LocalBufferPool}} 
> is only used to get segment size for creating read view in 
> {{SpillableSubpartition}}, and the buffers in the pool will not be used all 
> the time, so it will waste the buffer resource of global pool.
> We would like to remove the {{LocalBufferPool}} from the 
> {{PartitionRequestServerHandler}}, and the {{LocalBufferPool}} in 
> {{ResultPartition}} can also provide the segment size for creating sub 
> partition view.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6247) Build a jar-with-dependencies for flink-table and put it into ./opt

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985890#comment-15985890
 ] 

ASF GitHub Bot commented on FLINK-6247:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3666
  
Hi @fhueske Thanks for your Reviewing. And sorry for late reply. I have 
updated the PR. Please take a look. Thanks a lot.

Thanks,
SunJincheng


> Build a jar-with-dependencies for flink-table and put it into ./opt
> ---
>
> Key: FLINK-6247
> URL: https://issues.apache.org/jira/browse/FLINK-6247
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Due to a problem with Calcite and the unloading of classes, user-code 
> classloaders that include Calcite cannot be garbage collected. This is a 
> problem for long-running clusters that execute multiple Table API / SQL 
> programs with fat JARs that include the flink-table dependency. Each executed 
> program comes with an own user-code classloader that cannot be cleaned up 
> later.
> As a workaround, we recommend to copy the flink-table dependency into the 
> ./lib folder. However, we do not have a jar file with all required transitive 
> dependencies (Calcite, Janino, etc). Hence, users would need to build this 
> jar file themselves or copy all jars into ./lib.
> This issue is about creating a jar-with-dependencies and adding it to the 
> ./opt folder. Users can then copy the jar file from ./opt to ./lib to include 
> the table API in the classpath of Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3666: [FLINK-6247][table] Build a jar-with-dependencies for fli...

2017-04-26 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3666
  
Hi @fhueske Thanks for your Reviewing. And sorry for late reply. I have 
updated the PR. Please take a look. Thanks a lot.

Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6392) Change the alias of Window from optional to essential.

2017-04-26 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6392:
---
Description: 
Currently, The window clause use case looks like as following:
{code}
tab //Table('a,'b,'c)
   .window( Slide over 10.milli every 5.milli  as 'w) 
   .groupBy('w,'a,'b) 
   .select('a, 'b, 'c.sum, 'w.start, 'w.end)
{code}
As we see the alias of window is essential. But the current implementation of 
the TableAPI does not have the constraint for the alias,So we must refactoring 
the API definition using TYPE SYSTEM lead to constraint for the alias.

  was:
Currently, The window clause use case looks like as following:
{code}
tab //Table('a,'b,'c)
   .window( Slide over 10.milli every 5.milli  as 'w) 
   .groupBy('w,'a,'b) // WindowGroupedTable
   .select('a, 'b, 'c.sum, 'w.start, 'w.end)
{code}
As we see the alias of window is essential. But the current implementation of 
the TableAPI does not have the constraint for the alias,So we must refactoring 
the API definition using TYPE SYSTEM lead to constraint for the alias.


> Change the alias of Window from optional to essential.
> --
>
> Key: FLINK-6392
> URL: https://issues.apache.org/jira/browse/FLINK-6392
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> Currently, The window clause use case looks like as following:
> {code}
> tab //Table('a,'b,'c)
>.window( Slide over 10.milli every 5.milli  as 'w) 
>.groupBy('w,'a,'b) 
>.select('a, 'b, 'c.sum, 'w.start, 'w.end)
> {code}
> As we see the alias of window is essential. But the current implementation of 
> the TableAPI does not have the constraint for the alias,So we must 
> refactoring the API definition using TYPE SYSTEM lead to constraint for the 
> alias.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6392) Change the alias of Window from optional to essential.

2017-04-26 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6392:
---
Description: 
Currently, The window clause use case looks like as following:
{code}
tab //Table('a,'b,'c)
   .window( Slide over 10.milli every 5.milli  as 'w) 
   .groupBy('w,'a,'b) // WindowGroupedTable
   .select('a, 'b, 'c.sum, 'w.start, 'w.end)
{code}
As we see the alias of window is essential. But the current implementation of 
the TableAPI does not have the constraint for the alias,So we must refactoring 
the API definition using TYPE SYSTEM lead to constraint for the alias.

  was:
Currently, The window clause use case looks like:
{code}
tab //Table('a,'b,'c)
   .window( Slide over 10.milli every 5.milli  as 'w) 
   .groupBy('w,'a,'b) // WindowGroupedTable
   .select('a, 'b, 'c.sum, 'w.start, 'w.end)
{code}
As we see the alias of window is essential. But the current implementation of 
the TableAPI does not have the constraint for the alias,So we must refactoring 
the API definition using TYPE SYSTEM lead to constraint for the alias.


> Change the alias of Window from optional to essential.
> --
>
> Key: FLINK-6392
> URL: https://issues.apache.org/jira/browse/FLINK-6392
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> Currently, The window clause use case looks like as following:
> {code}
> tab //Table('a,'b,'c)
>.window( Slide over 10.milli every 5.milli  as 'w) 
>.groupBy('w,'a,'b) // WindowGroupedTable
>.select('a, 'b, 'c.sum, 'w.start, 'w.end)
> {code}
> As we see the alias of window is essential. But the current implementation of 
> the TableAPI does not have the constraint for the alias,So we must 
> refactoring the API definition using TYPE SYSTEM lead to constraint for the 
> alias.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6392) Change the alias of Window from optional to essential.

2017-04-26 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6392:
---
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-4557

> Change the alias of Window from optional to essential.
> --
>
> Key: FLINK-6392
> URL: https://issues.apache.org/jira/browse/FLINK-6392
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> Currently, The window clause use case looks like:
> {code}
> tab //Table('a,'b,'c)
>.window( Slide over 10.milli every 5.milli  as 'w) 
>.groupBy('w,'a,'b) // WindowGroupedTable
>.select('a, 'b, 'c.sum, 'w.start, 'w.end)
> {code}
> As we see the alias of window is essential. But the current implementation of 
> the TableAPI does not have the constraint for the alias,So we must 
> refactoring the API definition using TYPE SYSTEM lead to constraint for the 
> alias.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6392) Change the alias of Window from optional to essential.

2017-04-26 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6392:
--

 Summary: Change the alias of Window from optional to essential.
 Key: FLINK-6392
 URL: https://issues.apache.org/jira/browse/FLINK-6392
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.3.0
Reporter: sunjincheng
Assignee: sunjincheng
 Fix For: 1.3.0


Currently, The window clause use case looks like:
{code}
tab //Table('a,'b,'c)
   .window( Slide over 10.milli every 5.milli  as 'w) 
   .groupBy('w,'a,'b) // WindowGroupedTable
   .select('a, 'b, 'c.sum, 'w.start, 'w.end)
{code}
As we see the alias of window is essential. But the current implementation of 
the TableAPI does not have the constraint for the alias,So we must refactoring 
the API definition using TYPE SYSTEM lead to constraint for the alias.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6391) fix build for scala 2.11 (gelly-examples)

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985714#comment-15985714
 ] 

ASF GitHub Bot commented on FLINK-6391:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3784
  
@greghogan that might be a good idea but I don't know much about it.  I'm 
merely focused on fixing the build break.


> fix build for scala 2.11 (gelly-examples)
> -
>
> Key: FLINK-6391
> URL: https://issues.apache.org/jira/browse/FLINK-6391
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> After switching the build to Scala 2.11 (using 
> `tools/change-scala-version.sh`), the build fails in flink-dist module.
> {code}
> ...
> [INFO] flink-dist . FAILURE [ 19.337 
> s]
> [INFO] flink-fs-tests . SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 31:16 min
> [INFO] Finished at: 2017-04-26T15:17:43-07:00
> [INFO] Final Memory: 380M/1172M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> The root cause appears to be that the change-scala-version tool should update 
> flink-dist/.../assemblies/bin.xml to use the correct version of 
> flink-gelly-examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3784: FLINK-6391

2017-04-26 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3784
  
@greghogan that might be a good idea but I don't know much about it.  I'm 
merely focused on fixing the build break.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6360) Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh not changing flink-gelly-examples_2.10

2017-04-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985711#comment-15985711
 ] 

Greg Hogan commented on FLINK-6360:
---

[~jlaskowski], have you had a chance to work on this bug? It's impacting 
multiple developers and next Monday's code freeze necessitates building a 
release candidate.

> Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh 
> not changing flink-gelly-examples_2.10
> 
>
> Key: FLINK-6360
> URL: https://issues.apache.org/jira/browse/FLINK-6360
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.0
>Reporter: Jacek Laskowski
>Priority: Minor
>
> I'm building Flink for Scala 2.11 using the following command:
> {code}
> oss; cd flink; gco -- .; gl && \
> ./tools/change-scala-version.sh 2.11 && \
> mvn clean install -DskipTests -Dhadoop.version=2.7.3 -Dscala.version=2.11.7 
> {code}
> For the past couple of days I've been unable to build Flink because of the 
> following error:
> {code}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/jacek/dev/oss/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> I've been able to trace it down to {{flink-dist/src/main/assemblies/bin.xml}} 
> not being changed by {{./tools/change-scala-version.sh}} so the build tries 
> to include {{flink-gelly-examples_2.10}} rather than 
> {{flink-gelly-examples_2.11}}.
> Please comment if my reasoning is correct or not and I'd be happy to work on 
> it. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6391) fix build for scala 2.11 (gelly-examples)

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985710#comment-15985710
 ] 

ASF GitHub Bot commented on FLINK-6391:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3784
  
Any ideas why we can't just reference `scala.binary.version` in the Maven 
configurations rather than specifying `2.10` and modifying this in the change 
script? I don't see why `change-scala-version.sh` is any more complicated than 
`change-version.sh`.


> fix build for scala 2.11 (gelly-examples)
> -
>
> Key: FLINK-6391
> URL: https://issues.apache.org/jira/browse/FLINK-6391
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> After switching the build to Scala 2.11 (using 
> `tools/change-scala-version.sh`), the build fails in flink-dist module.
> {code}
> ...
> [INFO] flink-dist . FAILURE [ 19.337 
> s]
> [INFO] flink-fs-tests . SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 31:16 min
> [INFO] Finished at: 2017-04-26T15:17:43-07:00
> [INFO] Final Memory: 380M/1172M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> The root cause appears to be that the change-scala-version tool should update 
> flink-dist/.../assemblies/bin.xml to use the correct version of 
> flink-gelly-examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3784: FLINK-6391

2017-04-26 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3784
  
Any ideas why we can't just reference `scala.binary.version` in the Maven 
configurations rather than specifying `2.10` and modifying this in the change 
script? I don't see why `change-scala-version.sh` is any more complicated than 
`change-version.sh`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-04-26 Thread heytitle
Github user heytitle commented on the issue:

https://github.com/apache/flink/pull/3511
  
I also think about the abstract class but I'm not sure how to do it 
properly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6360) Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh not changing flink-gelly-examples_2.10

2017-04-26 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985700#comment-15985700
 ] 

Eron Wright  commented on FLINK-6360:
-

I fixed this and opened a PR at the same time as Greg marked it as a dup.  
Anyway here's the PR:

https://github.com/apache/flink/pull/3784

> Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh 
> not changing flink-gelly-examples_2.10
> 
>
> Key: FLINK-6360
> URL: https://issues.apache.org/jira/browse/FLINK-6360
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.0
>Reporter: Jacek Laskowski
>Priority: Minor
>
> I'm building Flink for Scala 2.11 using the following command:
> {code}
> oss; cd flink; gco -- .; gl && \
> ./tools/change-scala-version.sh 2.11 && \
> mvn clean install -DskipTests -Dhadoop.version=2.7.3 -Dscala.version=2.11.7 
> {code}
> For the past couple of days I've been unable to build Flink because of the 
> following error:
> {code}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/jacek/dev/oss/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> I've been able to trace it down to {{flink-dist/src/main/assemblies/bin.xml}} 
> not being changed by {{./tools/change-scala-version.sh}} so the build tries 
> to include {{flink-gelly-examples_2.10}} rather than 
> {{flink-gelly-examples_2.11}}.
> Please comment if my reasoning is correct or not and I'd be happy to work on 
> it. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6360) Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh not changing flink-gelly-examples_2.10

2017-04-26 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985700#comment-15985700
 ] 

Eron Wright  edited comment on FLINK-6360 at 4/26/17 10:55 PM:
---

I fixed this and opened a PR at the same time as Greg marked FLINK-6391 as a 
dup.  Anyway here's the PR:

https://github.com/apache/flink/pull/3784


was (Author: eronwright):
I fixed this and opened a PR at the same time as Greg marked it as a dup.  
Anyway here's the PR:

https://github.com/apache/flink/pull/3784

> Failed to create assembly for Scala 2.11 due to tools/change-scala-version.sh 
> not changing flink-gelly-examples_2.10
> 
>
> Key: FLINK-6360
> URL: https://issues.apache.org/jira/browse/FLINK-6360
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.0
>Reporter: Jacek Laskowski
>Priority: Minor
>
> I'm building Flink for Scala 2.11 using the following command:
> {code}
> oss; cd flink; gco -- .; gl && \
> ./tools/change-scala-version.sh 2.11 && \
> mvn clean install -DskipTests -Dhadoop.version=2.7.3 -Dscala.version=2.11.7 
> {code}
> For the past couple of days I've been unable to build Flink because of the 
> following error:
> {code}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/jacek/dev/oss/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> I've been able to trace it down to {{flink-dist/src/main/assemblies/bin.xml}} 
> not being changed by {{./tools/change-scala-version.sh}} so the build tries 
> to include {{flink-gelly-examples_2.10}} rather than 
> {{flink-gelly-examples_2.11}}.
> Please comment if my reasoning is correct or not and I'd be happy to work on 
> it. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6391) fix build for scala 2.11 (gelly-examples)

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985699#comment-15985699
 ] 

ASF GitHub Bot commented on FLINK-6391:
---

GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/3784

FLINK-6391

FLINK-6391
Improve change-scala-version.sh to correctly deal with references to 
flink-gelly-examples.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-6391

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3784.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3784


commit 0985697f071133abf0a512a4a79834a1ccd19cde
Author: Wright, Eron 
Date:   2017-04-26T22:48:57Z

FLINK-6391
Improve change-scala-version.sh to correctly deal with references to 
flink-gelly-examples.




> fix build for scala 2.11 (gelly-examples)
> -
>
> Key: FLINK-6391
> URL: https://issues.apache.org/jira/browse/FLINK-6391
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> After switching the build to Scala 2.11 (using 
> `tools/change-scala-version.sh`), the build fails in flink-dist module.
> {code}
> ...
> [INFO] flink-dist . FAILURE [ 19.337 
> s]
> [INFO] flink-fs-tests . SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 31:16 min
> [INFO] Finished at: 2017-04-26T15:17:43-07:00
> [INFO] Final Memory: 380M/1172M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> The root cause appears to be that the change-scala-version tool should update 
> flink-dist/.../assemblies/bin.xml to use the correct version of 
> flink-gelly-examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3784: FLINK-6391

2017-04-26 Thread EronWright
GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/3784

FLINK-6391

FLINK-6391
Improve change-scala-version.sh to correctly deal with references to 
flink-gelly-examples.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-6391

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3784.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3784


commit 0985697f071133abf0a512a4a79834a1ccd19cde
Author: Wright, Eron 
Date:   2017-04-26T22:48:57Z

FLINK-6391
Improve change-scala-version.sh to correctly deal with references to 
flink-gelly-examples.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6391) fix build for scala 2.11 (gelly-examples)

2017-04-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-6391.
-
Resolution: Duplicate

> fix build for scala 2.11 (gelly-examples)
> -
>
> Key: FLINK-6391
> URL: https://issues.apache.org/jira/browse/FLINK-6391
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> After switching the build to Scala 2.11 (using 
> `tools/change-scala-version.sh`), the build fails in flink-dist module.
> {code}
> ...
> [INFO] flink-dist . FAILURE [ 19.337 
> s]
> [INFO] flink-fs-tests . SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 31:16 min
> [INFO] Finished at: 2017-04-26T15:17:43-07:00
> [INFO] Final Memory: 380M/1172M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
> flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
> /Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
>  -> [Help 1]
> {code}
> The root cause appears to be that the change-scala-version tool should update 
> flink-dist/.../assemblies/bin.xml to use the correct version of 
> flink-gelly-examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6391) fix build for scala 2.11 (gelly-examples)

2017-04-26 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-6391:
---

 Summary: fix build for scala 2.11 (gelly-examples)
 Key: FLINK-6391
 URL: https://issues.apache.org/jira/browse/FLINK-6391
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Eron Wright 
Assignee: Eron Wright 


After switching the build to Scala 2.11 (using 
`tools/change-scala-version.sh`), the build fails in flink-dist module.

{code}
...
[INFO] flink-dist . FAILURE [ 19.337 s]
[INFO] flink-fs-tests . SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 31:16 min
[INFO] Finished at: 2017-04-26T15:17:43-07:00
[INFO] Final Memory: 380M/1172M
[INFO] 
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-assembly-plugin:2.6:single (bin) on project 
flink-dist_2.11: Failed to create assembly: Error adding file to archive: 
/Users/wrighe3/Projects/flink/flink-dist/../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-1.3-SNAPSHOT.jar
 -> [Help 1]
{code}

The root cause appears to be that the change-scala-version tool should update 
flink-dist/.../assemblies/bin.xml to use the correct version of 
flink-gelly-examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-04-26 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3511
  
@heytitle, you can `rebase -i dd20^` and then delete the first two lines 
which removes those commits from the history.

An initial thought from skimming the code: should we create an abstract 
`NormalizedKeySorterBase` with common code from the generated and non-generated 
implementations? This way the lines of code in the template would be minimized 
and we wouldn't need to synchronize changes. I don't see a reason why this 
would decrease performance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985629#comment-15985629
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user heytitle commented on the issue:

https://github.com/apache/flink/pull/3511
  
@greghogan May I ask you how to remove `FLINK-3722` commits?. Only way I 
can think of is `git rebase -i`, but this will rewrite history of this PR.


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
> Fix For: 1.3.0
>
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-04-26 Thread heytitle
Github user heytitle commented on the issue:

https://github.com/apache/flink/pull/3511
  
@greghogan May I ask you how to remove `FLINK-3722` commits?. Only way I 
can think of is `git rebase -i`, but this will rewrite history of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-6390.
---

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
>* @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>* @param timestamp The wall clock timestamp when the checkpoint was 
> triggered, for
>*  info/logging purposes. 
> 

[jira] [Resolved] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-6390.
-
Resolution: Fixed

Implemented in 90ca438106e63c5032ee2ad27e54e9f573eac386

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
>* @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>* @param timestamp The wall clock timestamp when the 

[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985621#comment-15985621
 ] 

ASF GitHub Bot commented on FLINK-6390:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3782


> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
>* @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>* 

[GitHub] flink pull request #3782: [FLINK-6390] [checkpoints] Add API for checkpoints...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3782


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4370) Offer a default IntelliJ inspection profile with Flink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985606#comment-15985606
 ] 

ASF GitHub Bot commented on FLINK-4370:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2352
  
@StephanEwen can these be shared under `tools/idea/...`? If they are 
installed by default then any user modifications will be noted by git. 
Conversely, most contributors won't notice these files or synchronize as 
additional inspections are enabled, but as of now developers are on their own.


> Offer a default IntelliJ inspection profile with Flink
> --
>
> Key: FLINK-4370
> URL: https://issues.apache.org/jira/browse/FLINK-4370
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> We can commit an inspection profile under {{.idea/inspectionProfiles}} which 
> should be automatically picked up when the code is checked out and imported 
> into IntelliJ



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2352: [FLINK-4370] Add an IntelliJ Inspections Profile

2017-04-26 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2352
  
@StephanEwen can these be shared under `tools/idea/...`? If they are 
installed by default then any user modifications will be noted by git. 
Conversely, most contributors won't notice these files or synchronize as 
additional inspections are enabled, but as of now developers are on their own.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java

2017-04-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985578#comment-15985578
 ] 

Greg Hogan commented on FLINK-6107:
---

[~StephanEwen], I think there was a perception that import order was not as 
consistent throughout the project. Previously there didn't seem to be strong 
opinions either way. I suspect the import order has drifted out-of-sync from 
your original work because this has not been enforced.

> Add custom checkstyle for flink-streaming-java
> --
>
> Key: FLINK-6107
> URL: https://issues.apache.org/jira/browse/FLINK-6107
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.3.0
>
>
> There was some consensus on the ML 
> (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E)
>  that we want to have a more uniform code style. We should start 
> module-by-module and by introducing increasingly stricter rules. We have to 
> be aware of the PR situation and ensure that we have minimal breakage for 
> contributors.
> This issue aims at adding a custom checkstyle.xml for 
> {{flink-streaming-java}} that is based on our current checkstyle.xml but adds 
> these checks for Javadocs:
> {code}
> 
> 
> 
> 
>   
>   
>   
>   
>   
>   
>   
>   
> 
> 
> 
> 
>   
>   
>   
> 
> 
>   
>   
> 
> {code}
> This checks:
>  - Every type has a type-level Javadoc
>  - Proper use of {{}} in Javadocs
>  - First sentence must end with a proper punctuation mark
>  - Proper use (including closing) of HTML tags



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985498#comment-15985498
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks for your pointer of the prototype!

> Do you intend to provide exactly-once guarantees for arbitrary updates?

As I think about it a little bit more, I think it might make sense to start 
with the at-least-once semantic first. In practice we make the JDBC call 
idempotent using `INSERT IF NOT EXISTS`.

The exactly-once part is more tricky and let's separate it out for now. 
What do you think?



> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3712
  
Thanks for your pointer of the prototype!

> Do you intend to provide exactly-once guarantees for arbitrary updates?

As I think about it a little bit more, I think it might make sense to start 
with the at-least-once semantic first. In practice we make the JDBC call 
idempotent using `INSERT IF NOT EXISTS`.

The exactly-once part is more tricky and let's separate it out for now. 
What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6293) Flakey JobManagerITCase

2017-04-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985432#comment-15985432
 ] 

Stephan Ewen commented on FLINK-6293:
-

Hitting this frequently on local builds as well:

{code}
Tests run: 21, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1,220.166 sec 
<<< FAILURE! - in org.apache.flink.runtime.jobmanager.JobManagerITCase
The JobManager actor must handle trigger savepoint response for non-existing 
job(org.apache.flink.runtime.jobmanager.JobManagerITCase)  Time elapsed: 
1,199.316 sec  <<< FAILURE!
java.lang.AssertionError: assertion failed: timeout (1199213200030 nanoseconds) 
during expectMsgClass waiting for class 
org.apache.flink.runtime.messages.JobManagerMessages$TriggerSavepointFailure
at scala.Predef$.assert(Predef.scala:179)
at 
akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423)
at akka.testkit.TestKitBase$class.expectMsgType(TestKit.scala:405)
at akka.testkit.TestKit.expectMsgType(TestKit.scala:718)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34$$anonfun$apply$mcV$sp$35.apply$mcV$sp(JobManagerITCase.scala:772)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34$$anonfun$apply$mcV$sp$35.apply(JobManagerITCase.scala:764)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34$$anonfun$apply$mcV$sp$35.apply(JobManagerITCase.scala:764)
at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
at akka.testkit.TestKit.within(TestKit.scala:718)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34.apply$mcV$sp(JobManagerITCase.scala:764)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34.apply(JobManagerITCase.scala:758)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase$$anonfun$1$$anonfun$apply$mcV$sp$34.apply(JobManagerITCase.scala:758)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at 
org.apache.flink.runtime.jobmanager.JobManagerITCase.withFixture(JobManagerITCase.scala:50)

{code}

> Flakey JobManagerITCase
> ---
>
> Key: FLINK-6293
> URL: https://issues.apache.org/jira/browse/FLINK-6293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see 
> https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true 
> The maven watchdog kills the build due to not output being produced within 
> 300s and {{JobManagerITCase}} seems to hang in line 772, i.e.
> {code:title=JobManagerITCase lines 
> 770-772|language=java|linenumbers=true|firstline=770}
> // Trigger savepoint for non-existing job
> jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor)
> val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
> {code}
> Although the (downloaded) logs do not quite allow a precise mapping to this 
> test case, it looks as if the following block may be related:
> {code}
> 09:34:47,684 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Akka ask timeout set to 100s
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Disabled queryable state server
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Starting FlinkMiniCluster.
> 09:34:47,809 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 09:34:47,837 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062
> 09:34:47,838 WARN  org.apache.flink.runtime.net.SSLUtils  
>- Not a SSL socket, will skip setting tls version and cipher suites.
> 09:34:47,839 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max 
> backlog: 1000
> 09:34:47,840 INFO  

[jira] [Updated] (FLINK-6293) Flakey JobManagerITCase

2017-04-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-6293:

Priority: Critical  (was: Major)

> Flakey JobManagerITCase
> ---
>
> Key: FLINK-6293
> URL: https://issues.apache.org/jira/browse/FLINK-6293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> Quite seldomly, {{JobManagerITCase}} seems to hang, e.g. see 
> https://api.travis-ci.org/jobs/220888193/log.txt?deansi=true 
> The maven watchdog kills the build due to not output being produced within 
> 300s and {{JobManagerITCase}} seems to hang in line 772, i.e.
> {code:title=JobManagerITCase lines 
> 770-772|language=java|linenumbers=true|firstline=770}
> // Trigger savepoint for non-existing job
> jobManager.tell(TriggerSavepoint(jobId, Option.apply("any")), testActor)
> val response = expectMsgType[TriggerSavepointFailure](deadline.timeLeft)
> {code}
> Although the (downloaded) logs do not quite allow a precise mapping to this 
> test case, it looks as if the following block may be related:
> {code}
> 09:34:47,684 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Akka ask timeout set to 100s
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Disabled queryable state server
> 09:34:47,777 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  
>- Starting FlinkMiniCluster.
> 09:34:47,809 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 09:34:47,837 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-eab23d04-ea18-4dc5-b1df-fcf9fc295062
> 09:34:47,838 WARN  org.apache.flink.runtime.net.SSLUtils  
>- Not a SSL socket, will skip setting tls version and cipher suites.
> 09:34:47,839 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:36745 - max concurrent requests: 50 - max 
> backlog: 1000
> 09:34:47,840 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>- No metrics reporter configured, no metrics will be exposed/reported.
> 09:34:47,850 INFO  
> org.apache.flink.runtime.testingUtils.TestingMemoryArchivist  - Started 
> memory archivist akka://flink/user/archive_1
> 09:34:47,860 INFO  org.apache.flink.runtime.testutils.TestingResourceManager  
>- Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 09:34:47,861 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager
>- Starting JobManager at akka://flink/user/jobmanager_1.
> 09:34:47,862 WARN  org.apache.flink.runtime.testingUtils.TestingJobManager
>- Discard message 
> LeaderSessionMessage(----,TriggerSavepoint(6e813070338a23b0ff571646bca56521,Some(any)))
>  because there is currently no valid leader id known.
> 09:34:47,862 INFO  org.apache.flink.runtime.testingUtils.TestingJobManager
>- JobManager akka://flink/user/jobmanager_1 was granted leadership with 
> leader session ID Some(----).
> 09:34:47,867 INFO  org.apache.flink.runtime.testutils.TestingResourceManager  
>- Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager_1#-652927556] - leader session 
> ----
> {code}
> If so, then this may be related to FLINK-6287 and may possibly even be a 
> duplicate.
> What is strange though is that the timeout for the expected message to arrive 
> is no more than 2m and thus the test should properly fail within 300s.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985423#comment-15985423
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113541703
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
--- End diff --

I see, In that case i would suggest to import the Flink for type and use 
the qualified name for the Datastax row.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113541703
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
--- End diff --

I see, In that case i would suggest to import the Flink for type and use 
the qualified name for the Datastax row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6287) Flakey JobManagerRegistrationTest

2017-04-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985404#comment-15985404
 ] 

Stephan Ewen commented on FLINK-6287:
-

A recent PR has addressed some instability, but I am still seeing failures 
quite frequently on a local build:

{code}
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest
The JobManager should handle repeated registration 
calls(org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest)  Time 
elapsed: 3.103 sec  <<< FAILURE!
java.lang.AssertionError: assertion failed: timeout (3 seconds) during 
expectMsgClass waiting for class 
org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
at scala.Predef$.assert(Predef.scala:179)
at 
akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423)
at akka.testkit.TestKitBase$class.expectMsgType(TestKit.scala:396)
at akka.testkit.TestKit.expectMsgType(TestKit.scala:718)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcV$sp(JobManagerRegistrationTest.scala:188)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:163)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(JobManagerRegistrationTest.scala:163)
at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
at akka.testkit.TestKit.within(TestKit.scala:718)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerRegistrationTest.scala:163)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:142)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest$$anonfun$1$$anonfun$apply$mcV$sp$4.apply(JobManagerRegistrationTest.scala:142)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.withFixture(JobManagerRegistrationTest.scala:51)
{code}



> Flakey JobManagerRegistrationTest
> -
>
> Key: FLINK-6287
> URL: https://issues.apache.org/jira/browse/FLINK-6287
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.3.0
> Environment: unit tests
>Reporter: Nico Kruber
>  Labels: test-stability
>
> There seems to be a race condition in the "{{JobManagerRegistrationTest.The 
> JobManager should handle repeated registration calls}}" (scala) unit test.
> Every so often, especially when my system is under load, this test fails with 
> a timeout after seeing the following messages in the log4j INFO outputs:
> {code}
> 14:18:42,257 INFO org.apache.flink.runtime.testutils.TestingResourceManager - 
> Trying to associate with JobManager leader akka://flink/user/$f#-1062324203
> 14:18:42,253 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting 
> JobManager at akka://flink/user/$f.
> 14:18:42,258 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
> message 
> LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c
>  @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, 
> heap=1556938752, managed=10,1)) because there is currently no valid leader id 
> known.
> 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
> message 
> LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c
>  @ nico-work.fritz.box (dataPort=1),cores=4, physMem=16686931968, 
> heap=1556938752, managed=10,1)) because there is currently no valid leader id 
> known.
> 14:18:42,259 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard 
> message 
> LeaderSessionMessage(----,RegisterTaskManager(8b6ad3eccdbfcb199df630b794b6ec0c,8b6ad3eccdbfcb199df630b794b6ec0c
>  @ nico-work.fritz.box 

[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985378#comment-15985378
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113446953
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointFrom12MigrationITCase.java
 ---
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.2 
savepoint.
+ *
+ * The test pipeline contains both "Checkpointed" state and keyed user 
state.
+ *
+ * The tests will time out if they don't see the required number of 
successful checks within
+ * a time limit.
+ */
+public class StatefulUDFSavepointFrom12MigrationITCase extends 
SavepointMigrationTestBase {
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   /**
+* This has to be manually executed to create the savepoint on Flink 
1.2.
+*/
+   @Test
+   @Ignore
+   public void testCreateSavepointOnFlink12() throws Exception {
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+   // we only test memory state backend yet
+   env.setStateBackend(new MemoryStateBackend());
+   env.enableCheckpointing(500);
+   env.setParallelism(4);
+   

[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985379#comment-15985379
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113441899
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 ---
@@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, 
long checkpointId, long timest
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
+   boolean readFlag = false;
if (userFunction instanceof Checkpointed ||
-   (userFunction instanceof CheckpointedRestoring 
&& in instanceof Migration)) {
+   (userFunction instanceof 
CheckpointedRestoring)) {
--- End diff --

remove braces around `userFunction instanceof CheckpointedRestoring`; could 
also move it to the previous line.


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985375#comment-15985375
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113435588
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) 
throws JobExecutionException
}
 
/**
+* Reattaches to a running from from the supplied job id
--- End diff --

typo: from -> job


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985381#comment-15985381
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113435697
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) 
throws JobExecutionException
}
 
/**
+* Reattaches to a running from from the supplied job id
+*
+* @param jobID The job id of the job to attach to
+* @return The JobExecutionResult for the jobID
+* @throws JobExecutionException if an error occurs during monitoring 
the job execution
+*/
+   public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
+   final LeaderRetrievalService leaderRetrievalService;
+   try {
+   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
+   }
+
+   ActorGateway jobManagerGateway;
+   try {
+   jobManagerGateway = getJobManagerGateway();
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway");
--- End diff --

Include the exception


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985380#comment-15985380
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113435639
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) 
throws JobExecutionException
}
 
/**
+* Reattaches to a running from from the supplied job id
+*
+* @param jobID The job id of the job to attach to
+* @return The JobExecutionResult for the jobID
+* @throws JobExecutionException if an error occurs during monitoring 
the job execution
+*/
+   public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
+   final LeaderRetrievalService leaderRetrievalService;
+   try {
+   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
+   }
+
+   ActorGateway jobManagerGateway;
+   try {
+   jobManagerGateway = getJobManagerGateway();
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway");
+   }
+
+   return JobClient.attachToRunningJob(
+   jobID,
+   jobManagerGateway,
+   flinkConfig,
+   actorSystemLoader.get(),
+   leaderRetrievalService,
+   timeout,
+   printStatusDuringExecution);
+   }
+
+
--- End diff --

remove empty line


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985376#comment-15985376
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113442453
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 ---
@@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, 
long checkpointId, long timest
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
+   boolean readFlag = false;
--- End diff --

the variable name is a bit ambiguous, (it could also mean "you have to read 
the flag".


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985377#comment-15985377
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113440360
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
 ---
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import java.io.FileOutputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class ContinuousFileProcessingFrom12MigrationTest {
+
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 100;
+
+   @ClassRule
+   public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+   /**
+* Manually run this to write binary snapshot data. Remove @Ignore to 
run.
+*/
+   @Ignore
+   @Test
+   public void writeReaderSnapshot() throws Exception {
+
+   File testFolder = tempFolder.newFolder();
+
+   TimestampedFileInputSplit split1 =
+   new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+   TimestampedFileInputSplit split2 =
+   new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
+
+   TimestampedFileInputSplit split3 =
+   new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+   TimestampedFileInputSplit split4 =
+   new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+   final OneShotLatch latch = new OneShotLatch();
+   BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+   ContinuousFileReaderOperator initReader = new 
ContinuousFileReaderOperator<>(
+   format);
+   

[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113441899
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 ---
@@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, 
long checkpointId, long timest
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
+   boolean readFlag = false;
if (userFunction instanceof Checkpointed ||
-   (userFunction instanceof CheckpointedRestoring 
&& in instanceof Migration)) {
+   (userFunction instanceof 
CheckpointedRestoring)) {
--- End diff --

remove braces around `userFunction instanceof CheckpointedRestoring`; could 
also move it to the previous line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113435639
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) 
throws JobExecutionException
}
 
/**
+* Reattaches to a running from from the supplied job id
+*
+* @param jobID The job id of the job to attach to
+* @return The JobExecutionResult for the jobID
+* @throws JobExecutionException if an error occurs during monitoring 
the job execution
+*/
+   public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
+   final LeaderRetrievalService leaderRetrievalService;
+   try {
+   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
+   }
+
+   ActorGateway jobManagerGateway;
+   try {
+   jobManagerGateway = getJobManagerGateway();
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway");
+   }
+
+   return JobClient.attachToRunningJob(
+   jobID,
+   jobManagerGateway,
+   flinkConfig,
+   actorSystemLoader.get(),
+   leaderRetrievalService,
+   timeout,
+   printStatusDuringExecution);
+   }
+
+
--- End diff --

remove empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113435697
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) 
throws JobExecutionException
}
 
/**
+* Reattaches to a running from from the supplied job id
+*
+* @param jobID The job id of the job to attach to
+* @return The JobExecutionResult for the jobID
+* @throws JobExecutionException if an error occurs during monitoring 
the job execution
+*/
+   public JobListeningContext connectToJob(JobID jobID) throws 
JobExecutionException {
+   final LeaderRetrievalService leaderRetrievalService;
+   try {
+   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
+   }
+
+   ActorGateway jobManagerGateway;
+   try {
+   jobManagerGateway = getJobManagerGateway();
+   } catch (Exception e) {
+   throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway");
--- End diff --

Include the exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113446953
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointFrom12MigrationITCase.java
 ---
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.2 
savepoint.
+ *
+ * The test pipeline contains both "Checkpointed" state and keyed user 
state.
+ *
+ * The tests will time out if they don't see the required number of 
successful checks within
+ * a time limit.
+ */
+public class StatefulUDFSavepointFrom12MigrationITCase extends 
SavepointMigrationTestBase {
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   /**
+* This has to be manually executed to create the savepoint on Flink 
1.2.
+*/
+   @Test
+   @Ignore
+   public void testCreateSavepointOnFlink12() throws Exception {
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+   // we only test memory state backend yet
+   env.setStateBackend(new MemoryStateBackend());
+   env.enableCheckpointing(500);
+   env.setParallelism(4);
+   env.setMaxParallelism(4);
+
+   env
+   .addSource(new 
LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+   .flatMap(new 

[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113442453
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 ---
@@ -155,12 +155,14 @@ public void snapshotState(FSDataOutputStream out, 
long checkpointId, long timest
 
@Override
public void restoreState(FSDataInputStream in) throws Exception {
+   boolean readFlag = false;
--- End diff --

the variable name is a bit ambiguous, (it could also mean "you have to read 
the flag".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113435588
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -489,6 +489,39 @@ public JobExecutionResult retrieveJob(JobID jobID) 
throws JobExecutionException
}
 
/**
+* Reattaches to a running from from the supplied job id
--- End diff --

typo: from -> job


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3778#discussion_r113440360
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
 ---
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import java.io.FileOutputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class ContinuousFileProcessingFrom12MigrationTest {
+
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 100;
+
+   @ClassRule
+   public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+   /**
+* Manually run this to write binary snapshot data. Remove @Ignore to 
run.
+*/
+   @Ignore
+   @Test
+   public void writeReaderSnapshot() throws Exception {
+
+   File testFolder = tempFolder.newFolder();
+
+   TimestampedFileInputSplit split1 =
+   new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+   TimestampedFileInputSplit split2 =
+   new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 101, 200, null);
+
+   TimestampedFileInputSplit split3 =
+   new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+   TimestampedFileInputSplit split4 =
+   new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+   final OneShotLatch latch = new OneShotLatch();
+   BlockingFileInputFormat format = new 
BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+   ContinuousFileReaderOperator initReader = new 
ContinuousFileReaderOperator<>(
+   format);
+   initReader.setOutputType(typeInfo, new ExecutionConfig());
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
OneInputStreamOperatorTestHarness<>(initReader);
 

[jira] [Commented] (FLINK-6175) HistoryServerTest.testFullArchiveLifecycle fails

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985286#comment-15985286
 ] 

ASF GitHub Bot commented on FLINK-6175:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3655
  
While at hardening: The test seems to work with a fix port. That is bound 
to cause failures due to conflicts (for example 8082 is common to be used if 
you have a local HA Flink for testing).


> HistoryServerTest.testFullArchiveLifecycle fails
> 
>
> Key: FLINK-6175
> URL: https://issues.apache.org/jira/browse/FLINK-6175
> Project: Flink
>  Issue Type: Test
>  Components: History Server, Tests, Webfrontend
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/213933823/log.txt
> {code}
> estFullArchiveLifecycle(org.apache.flink.runtime.webmonitor.history.HistoryServerTest)
>   Time elapsed: 2.162 sec  <<< FAILURE!
> java.lang.AssertionError: /joboverview.json did not contain valid json
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertNotNull(Assert.java:712)
>   at 
> org.apache.flink.runtime.webmonitor.history.HistoryServerTest.testFullArchiveLifecycle(HistoryServerTest.java:98)
> {code}
> Happened on a branch with unrelated changes [~Zentol].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985287#comment-15985287
 ] 

ASF GitHub Bot commented on FLINK-5752:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
I checked the recent failure.
`Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 20.417 sec 
<<< FAILURE! - in 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest`

This failure seems unrelated with the changes in this patch.
@tonycox , @fhueske 
Just a gentle ping.


> Support push down projections for HBaseTableSource
> --
>
> Key: FLINK-5752
> URL: https://issues.apache.org/jira/browse/FLINK-5752
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
>
> This is after the discussion to create NestedProjectableTableSource. 
> Currently we support nested schema for the non-relational type of DBs like 
> HBase. 
> But this does not allow push down projection. This JIRA is to implement that. 
> Once FLINK-5698 is implemented then we should be making use of the feature to 
> push down the projections for a nested table. So in case of HBase if we have 
> {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query 
> that needs to select f2.c - then we should be specifically able to project 
> only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such 
> projections and HBaseTableSource should make use of that API to do the 
> projection.
> [~fhueske], [~tonycox], [~jark]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3760: FLINK-5752 Support push down projections for HBaseTableSo...

2017-04-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3760
  
I checked the recent failure.
`Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 20.417 sec 
<<< FAILURE! - in 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest`

This failure seems unrelated with the changes in this patch.
@tonycox , @fhueske 
Just a gentle ping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3655: [FLINK-6175] Harden HistoryServerTest#testFullArchiveLife...

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3655
  
While at hardening: The test seems to work with a fix port. That is bound 
to cause failures due to conflicts (for example 8082 is common to be used if 
you have a local HA Flink for testing).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985265#comment-15985265
 ] 

ASF GitHub Bot commented on FLINK-6048:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3536


> Asynchronous snapshots for heap-based operator state backends
> -
>
> Key: FLINK-6048
> URL: https://issues.apache.org/jira/browse/FLINK-6048
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> The synchronous checkpointing mechanism of heap-based operator state backends 
> blocks element processing for the duration of the checkpoint.
> We could implement an heap-based operator state backend that allows for 
> asynchronous checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3536: [FLINK-6048] Asynchronous snapshots for heap-based...

2017-04-26 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3536


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113519219
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,109 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-datadog
+   flink-metrics-datadog
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
jar-with-dependencies
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985247#comment-15985247
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113519219
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,109 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-datadog
+   flink-metrics-datadog
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
jar-with-dependencies
--- End diff --

yes.


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6338) SimpleStringUtils should be called StringValueUtils

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985245#comment-15985245
 ] 

ASF GitHub Bot commented on FLINK-6338:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3783
  
Please fix the PR title, you are referencing the wrong JIRA.


> SimpleStringUtils should be called StringValueUtils
> ---
>
> Key: FLINK-6338
> URL: https://issues.apache.org/jira/browse/FLINK-6338
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Trivial
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...

2017-04-26 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3783
  
Please fix the PR title, you are referencing the wrong JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985237#comment-15985237
 ] 

ASF GitHub Bot commented on FLINK-6048:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3536
  
Thanks for the review @tillrohrmann. I addressed your comments concerning 
unregistration of streams. Will merge this now.


> Asynchronous snapshots for heap-based operator state backends
> -
>
> Key: FLINK-6048
> URL: https://issues.apache.org/jira/browse/FLINK-6048
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> The synchronous checkpointing mechanism of heap-based operator state backends 
> blocks element processing for the duration of the checkpoint.
> We could implement an heap-based operator state backend that allows for 
> asynchronous checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3536: [FLINK-6048] Asynchronous snapshots for heap-based Operat...

2017-04-26 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3536
  
Thanks for the review @tillrohrmann. I addressed your comments concerning 
unregistration of streams. Will merge this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java

2017-04-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985221#comment-15985221
 ] 

Stephan Ewen commented on FLINK-6107:
-

I would like to make a suggestion for an adjustment to the style, specifically 
to the import order.

Basically all other files in Flink have the pattern:
  # Flink / library imports
  # java.foo.bar imports
  # static imports

Also, most files place spaces between components that the imports are derived 
from.

It may just be me, being an oldschool guy that looks at the imports quite a bit 
(almost for every new class I open, I find it useful to get an initial overview 
of what a class interacts with), but I find the style before was much better to 
get a "quick glance" impression:

  - Spaces between logical goups (Flink / logger / library / etc) and 
  - Flink and libraries first (its what matters to get the overview)
  - java below (not really important for the overview)
  - static imports last (they are just syntactic sugar and not required for any 
understanting).

So, why don't we keep that style? It would also result in fewer necessary 
reformatting and fewer merge conflicts.
I pasted the examples below to illustrate that:

h4. Original formatting of most files
{code}
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayDeque;

import static org.apache.flink.util.Preconditions.checkArgument;
{code}

h4. New Format
{code}
import static org.apache.flink.util.Preconditions.checkArgument;

import java.io.IOException;
import java.util.ArrayDeque;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
{code}

> Add custom checkstyle for flink-streaming-java
> --
>
> Key: FLINK-6107
> URL: https://issues.apache.org/jira/browse/FLINK-6107
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.3.0
>
>
> There was some consensus on the ML 
> (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E)
>  that we want to have a more uniform code style. We should start 
> module-by-module and by introducing increasingly stricter rules. We have to 
> be aware of the PR situation and ensure that we have minimal breakage for 
> contributors.
> This issue aims at adding a custom checkstyle.xml for 
> {{flink-streaming-java}} that is based on our current checkstyle.xml but adds 
> these checks for Javadocs:
> {code}
> 
> 
> 
> 
>   
>   
>   
>   
>   
>   
>   
>   
> 
> 
> 
> 
>   
>   
>   
> 
> 
>   
>   
> 
> {code}
> This checks:
>  - Every type has a type-level Javadoc
>  - Proper use of {{}} in Javadocs
>  - First sentence must end with a proper punctuation mark
>  - Proper use (including closing) of HTML tags



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985213#comment-15985213
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3511
  
@heytitle please remove the old FLINK-3722 commits and rebase to master.


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
> Fix For: 1.3.0
>
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3511: [Flink-5734] code generation for normalizedkey sorter

2017-04-26 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3511
  
@heytitle please remove the old FLINK-3722 commits and rebase to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985209#comment-15985209
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli closed the pull request at:

https://github.com/apache/flink/pull/3771


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread stefanobortoli
Github user stefanobortoli closed the pull request at:

https://github.com/apache/flink/pull/3771


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6357) ParameterTool get unrequested parameters

2017-04-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-6357:
--
Summary: ParameterTool get unrequested parameters  (was: Parametertool get 
unrequested parameters)

> ParameterTool get unrequested parameters
> 
>
> Key: FLINK-6357
> URL: https://issues.apache.org/jira/browse/FLINK-6357
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The Gelly examples use {{ParameterTool}} to parse required and optional 
> parameters. In the latter case we should detect if a user mistypes a 
> parameter name. I would like to add a {{Set 
> getUnrequestedParameters()}} method returning parameter names not requested 
> by {{has}} or any of the {{get}} methods.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985208#comment-15985208
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske I have created #3783 with just the code generation part. At least 
the GROUP BY distinct can move ahead. I will close this PR and wait for the 
merging of the Calcite fix.


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske I have created #3783 with just the code generation part. At least 
the GROUP BY distinct can move ahead. I will close this PR and wait for the 
merging of the Calcite fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2017-04-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3722:
--
Fix Version/s: 1.3.0

> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
> Fix For: 1.3.0
>
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2017-04-26 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-3722.
-
Resolution: Implemented

Implemented in 336b95d4eedc23e5ce37d1739165157e127c65f8

> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
> Fix For: 1.3.0
>
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985203#comment-15985203
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2628


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2628: [FLINK-3722] [runtime] Don't / and % when sorting

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2628


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6338) SimpleStringUtils should be called StringValueUtils

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985198#comment-15985198
 ] 

ASF GitHub Bot commented on FLINK-6338:
---

Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3783
  
@fhueske please have a look at this PR, it contains just the code 
generation part with optional distinct.


> SimpleStringUtils should be called StringValueUtils
> ---
>
> Key: FLINK-6338
> URL: https://issues.apache.org/jira/browse/FLINK-6338
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Trivial
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3783
  
@fhueske please have a look at this PR, it contains just the code 
generation part with optional distinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6338) SimpleStringUtils should be called StringValueUtils

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985197#comment-15985197
 ] 

ASF GitHub Bot commented on FLINK-6338:
---

GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3783

[FLINK-6338] Add support for DISTINCT into Code Generated Aggregations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6338

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3783.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3783


commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811
Author: Stefano Bortoli 
Date:   2017-04-26T17:22:04Z

Added code generation distinct aggregation logic




> SimpleStringUtils should be called StringValueUtils
> ---
>
> Key: FLINK-6338
> URL: https://issues.apache.org/jira/browse/FLINK-6338
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Trivial
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3783: [FLINK-6338] Add support for DISTINCT into Code Ge...

2017-04-26 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3783

[FLINK-6338] Add support for DISTINCT into Code Generated Aggregations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6338

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3783.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3783


commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811
Author: Stefano Bortoli 
Date:   2017-04-26T17:22:04Z

Added code generation distinct aggregation logic




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985190#comment-15985190
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113512176
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
--- End diff --

I wouldn't worry too much about since 1) the current code works well, and 
changing it doesn't provide extra readability 2) it requires dependency on 
flink-core, which is completely unnecessary just because of a config


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113512176
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
--- End diff --

I wouldn't worry too much about since 1) the current code works well, and 
changing it doesn't provide extra readability 2) it requires dependency on 
flink-core, which is completely unnecessary just because of a config


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985159#comment-15985159
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113510125
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,109 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-datadog
+   flink-metrics-datadog
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
jar-with-dependencies
--- End diff --

After removing this line, the shaded jar will be named 
'flink-metrics-datadog-1.3-SNAPSHOT-shaded.jar'. Is it what you want?


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-26 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113510125
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,109 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-datadog
+   flink-metrics-datadog
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
jar-with-dependencies
--- End diff --

After removing this line, the shaded jar will be named 
'flink-metrics-datadog-1.3-SNAPSHOT-shaded.jar'. Is it what you want?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985157#comment-15985157
 ] 

ASF GitHub Bot commented on FLINK-6390:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3782
  
Thanks a lot for the fast review. Agree with both issues raised. Will 
address them while merging...


> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
> 

[GitHub] flink issue #3782: [FLINK-6390] [checkpoints] Add API for checkpoints that a...

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3782
  
Thanks a lot for the fast review. Agree with both issues raised. Will 
address them while merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985154#comment-15985154
 ] 

ASF GitHub Bot commented on FLINK-6390:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3782#discussion_r113504901
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * A simple serializer interface for versioned serialization.
+ * 
+ * The serializer has a version (returned by {@link #getVersion()}) 
which can be attached
+ * to the serialized data. When the serializer evolves, the version can be 
used to identify
+ * with which prior version the data was serialized.
+ * 
+ * {@code
+ * MyType someObject = ...;
+ * SimpleVersionedSerializer serializer = ...;
+ *
+ * byte[] serializedData = serializer.serialize(someObject);
+ * int version = serializer.getVersion();
+ *
+ * MyType deserialized = serializer.deserialize(version, serializedData);
+ * 
+ * byte[] someOldData = ...;
+ * int oldVersion = ...;
+ * MyType deserializedOldObject = serializer.deserialize(oldVersion, 
someOldData);
+ * 
+ * }
+ * 
+ * @param  The data type serialized / deserialized by this serializer.
+ */
+public interface SimpleVersionedSerializer extends Versioned {
+
+   /**
+* Gets the version with which this serializer serializes.
+* 
+* @return The version of the serialization schema.
+*/
+   @Override
+   int getVersion();
+
+   /**
+* Serializes the given object. The serialization is assumed to 
correspond to the
+* current serialization version (as returned by {@link #getVersion()}.
+*
+* 
+* @param checkpointData The object to serialize.
+* @return The serialized data (bytes).
+* 
+* @throws IOException Thrown, if the serialization fails.
+*/
+   byte[] serialize(E checkpointData) throws IOException;
--- End diff --

`checkpointData` is maybe a too specific parameter name.


> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The 

[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985155#comment-15985155
 ] 

ASF GitHub Bot commented on FLINK-6390:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3782#discussion_r113507485
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Collection of methods to deal with checkpoint master hooks.
+ */
+public class MasterHooks {
+
+   // 

+   //  checkpoint triggering
+   // 

+
+   /**
+* Triggers all given master hooks and returns state objects for each 
hook that
+* produced a state.
+* 
+* @param hooks The hooks to trigger
+* @param checkpointId The checkpoint ID of the triggering checkpoint
+* @param timestamp The (informational) timestamp for the triggering 
checkpoint 
+* @param executor An executor that can be used for asynchronous I/O 
calls
+* @param timeout The maximum time that a hook may take to complete
+* 
+* @return A list containing all states produced by the hooks
+* 
+* @throws FlinkException Thrown, if the hooks throw an exception, or 
the state+
+*deserialization fails.
+*/
+   public static List triggerMasterHooks(
+   Collection hooks,
+   long checkpointId,
+   long timestamp,
+   Executor executor,
+   Time timeout) throws FlinkException {
+
+   final ArrayList states = new 
ArrayList<>(hooks.size());
+
+   for (MasterTriggerRestoreHook hook : hooks) {
+   MasterState state = triggerHook(hook, checkpointId, 
timestamp, executor, timeout);
+   if (state != null) {
+   states.add(state);
+   }
+   }
+
+   states.trimToSize();
+   return states;
+   }
+
+   private static  MasterState triggerHook(
+   MasterTriggerRestoreHook hook,
+   long checkpointId,
+   long timestamp,
+   Executor executor,
+   Time timeout) throws FlinkException {
+
+   @SuppressWarnings("unchecked")
+   final MasterTriggerRestoreHook typedHook = 
(MasterTriggerRestoreHook) hook;
+
+   final String id = typedHook.getIdentifier();
+   final SimpleVersionedSerializer serializer = 
typedHook.createCheckpointDataSerializer();
+
+   // call the hook!
+   final Future resultFuture;
+   try {
+   resultFuture = 
typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
+   }

[GitHub] flink pull request #3782: [FLINK-6390] [checkpoints] Add API for checkpoints...

2017-04-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3782#discussion_r113504901
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * A simple serializer interface for versioned serialization.
+ * 
+ * The serializer has a version (returned by {@link #getVersion()}) 
which can be attached
+ * to the serialized data. When the serializer evolves, the version can be 
used to identify
+ * with which prior version the data was serialized.
+ * 
+ * {@code
+ * MyType someObject = ...;
+ * SimpleVersionedSerializer serializer = ...;
+ *
+ * byte[] serializedData = serializer.serialize(someObject);
+ * int version = serializer.getVersion();
+ *
+ * MyType deserialized = serializer.deserialize(version, serializedData);
+ * 
+ * byte[] someOldData = ...;
+ * int oldVersion = ...;
+ * MyType deserializedOldObject = serializer.deserialize(oldVersion, 
someOldData);
+ * 
+ * }
+ * 
+ * @param  The data type serialized / deserialized by this serializer.
+ */
+public interface SimpleVersionedSerializer extends Versioned {
+
+   /**
+* Gets the version with which this serializer serializes.
+* 
+* @return The version of the serialization schema.
+*/
+   @Override
+   int getVersion();
+
+   /**
+* Serializes the given object. The serialization is assumed to 
correspond to the
+* current serialization version (as returned by {@link #getVersion()}.
+*
+* 
+* @param checkpointData The object to serialize.
+* @return The serialized data (bytes).
+* 
+* @throws IOException Thrown, if the serialization fails.
+*/
+   byte[] serialize(E checkpointData) throws IOException;
--- End diff --

`checkpointData` is maybe a too specific parameter name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3782: [FLINK-6390] [checkpoints] Add API for checkpoints...

2017-04-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3782#discussion_r113507485
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Collection of methods to deal with checkpoint master hooks.
+ */
+public class MasterHooks {
+
+   // 

+   //  checkpoint triggering
+   // 

+
+   /**
+* Triggers all given master hooks and returns state objects for each 
hook that
+* produced a state.
+* 
+* @param hooks The hooks to trigger
+* @param checkpointId The checkpoint ID of the triggering checkpoint
+* @param timestamp The (informational) timestamp for the triggering 
checkpoint 
+* @param executor An executor that can be used for asynchronous I/O 
calls
+* @param timeout The maximum time that a hook may take to complete
+* 
+* @return A list containing all states produced by the hooks
+* 
+* @throws FlinkException Thrown, if the hooks throw an exception, or 
the state+
+*deserialization fails.
+*/
+   public static List triggerMasterHooks(
+   Collection hooks,
+   long checkpointId,
+   long timestamp,
+   Executor executor,
+   Time timeout) throws FlinkException {
+
+   final ArrayList states = new 
ArrayList<>(hooks.size());
+
+   for (MasterTriggerRestoreHook hook : hooks) {
+   MasterState state = triggerHook(hook, checkpointId, 
timestamp, executor, timeout);
+   if (state != null) {
+   states.add(state);
+   }
+   }
+
+   states.trimToSize();
+   return states;
+   }
+
+   private static  MasterState triggerHook(
+   MasterTriggerRestoreHook hook,
+   long checkpointId,
+   long timestamp,
+   Executor executor,
+   Time timeout) throws FlinkException {
+
+   @SuppressWarnings("unchecked")
+   final MasterTriggerRestoreHook typedHook = 
(MasterTriggerRestoreHook) hook;
+
+   final String id = typedHook.getIdentifier();
+   final SimpleVersionedSerializer serializer = 
typedHook.createCheckpointDataSerializer();
+
+   // call the hook!
+   final Future resultFuture;
+   try {
+   resultFuture = 
typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
+   }
+   catch (Throwable t) {
+   ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+   throw new FlinkException("Error while triggering 
checkpoint master hook '" + id + '\'', t);
+   

[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985138#comment-15985138
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
I think this is reasonable as the current implementation doesnt work for 
dynamic new topics. (we should also deprecate the current one)

But let's hear what others say :)


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
I think this is reasonable as the current implementation doesnt work for 
dynamic new topics. (we should also deprecate the current one)

But let's hear what others say :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985122#comment-15985122
 ] 

Gyula Fora commented on FLINK-6390:
---

we could call it completeCheckpoint for example

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
>* @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>* @param timestamp The wall clock timestamp when the 

[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985119#comment-15985119
 ] 

Gyula Fora commented on FLINK-6390:
---

Hi Stephan,

This looks pretty useful. One thing that came to my mind about this whether it 
makes sense to add a hook when all tasks have completeted their local snapshot 
but before completing the full snapshot. (To implement a 2 phase committing 
logic for instance which could be used backends that present the data 
externally)

Gyula

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by 

[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15985117#comment-15985117
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user PangZhi commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113505564
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
--- End diff --

@zentol There is another Row class used here. So need to use qualified name.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   4   >