[jira] [Commented] (FLINK-6367) support custom header settings of allow origin

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996257#comment-15996257
 ] 

ASF GitHub Bot commented on FLINK-6367:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3769
  
Patience please...


> support custom header settings of allow origin
> --
>
> Key: FLINK-6367
> URL: https://issues.apache.org/jira/browse/FLINK-6367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> `jobmanager.web.access-control-allow-origin`: Enable custom access control 
> parameter for allow origin header, default is `*`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...

2017-05-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3769
  
Patience please...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6439) Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996245#comment-15996245
 ] 

ASF GitHub Bot commented on FLINK-6439:
---

GitHub user fanyon opened a pull request:

https://github.com/apache/flink/pull/3819

[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

…tUtil

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fanyon/flink FLINK-6439

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3819


commit de723c7eb9ad9b8c8c27d6b884d47faa7d324dfd
Author: mengji.fy 
Date:   2017-05-04T05:37:01Z

[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil




> Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()
> --
>
> Key: FLINK-6439
> URL: https://issues.apache.org/jira/browse/FLINK-6439
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Fang Yong
>Priority: Minor
>
> {code}
> FileInputStream in = new FileInputStream(path);
> DataInputStream dis = new DataInputStream(in);
> {code}
> None of the in / dis is closed upon return from the method.
> In writeStateHandle(), OutputStream should be closed in finally block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3819: [FLINK-6439] Fix close OutputStream && InputStream...

2017-05-03 Thread fanyon
GitHub user fanyon opened a pull request:

https://github.com/apache/flink/pull/3819

[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

…tUtil

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fanyon/flink FLINK-6439

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3819


commit de723c7eb9ad9b8c8c27d6b884d47faa7d324dfd
Author: mengji.fy 
Date:   2017-05-04T05:37:01Z

[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996233#comment-15996233
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3801
  
@StefanRRichter  Thanks a lot for your review. I have updated the pull 
request as suggested. The following changes are made

1. Remove the checkpoint type for incremental checkpoints. Now the support 
for incremental checkpointing becomes a configurable feature in 
`RocksDBKeyedStateBackend`, just like asynchronous checkpointing in 
`HeapKeyedStateBackend`. Incremental checkpointing will be performed if the 
feature is enabled and the checkpoint to perform is not a savepoint.

2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` 
and do some refactoring.

3. Allow `KeyedStateHandle` to register shared states.

4. Maintain the information of last completed checkpoint with the 
notification of `AbstractStreamOperator`.

5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources 
in both full and incremental checkpointing.

6. Parameterize `PartitionedStateCheckpointingITCase` to test the 
snapshotting and restoring with different backend settings.

It's appreciated if you can take a look at these changes. Any comment is 
welcome.


> Implement incremental checkpointing in RocksDBStateBackend
> --
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3801
  
@StefanRRichter  Thanks a lot for your review. I have updated the pull 
request as suggested. The following changes are made

1. Remove the checkpoint type for incremental checkpoints. Now the support 
for incremental checkpointing becomes a configurable feature in 
`RocksDBKeyedStateBackend`, just like asynchronous checkpointing in 
`HeapKeyedStateBackend`. Incremental checkpointing will be performed if the 
feature is enabled and the checkpoint to perform is not a savepoint.

2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` 
and do some refactoring.

3. Allow `KeyedStateHandle` to register shared states.

4. Maintain the information of last completed checkpoint with the 
notification of `AbstractStreamOperator`.

5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources 
in both full and incremental checkpointing.

6. Parameterize `PartitionedStateCheckpointingITCase` to test the 
snapshotting and restoring with different backend settings.

It's appreciated if you can take a look at these changes. Any comment is 
welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996218#comment-15996218
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703946
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
ss

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703946
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
}
}
 
+   private static class RocksDBIncrementalRestoreOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) {
+   this.stateBackend = stateBackend;
+   }
+
+   private List> readMetaData(
+   StreamStateHandle metaStateHandle) throws 
Exception {
+
+   FSDataInputStream inputStream = null;
+
+   try {
+   inputStream = metaStateHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+   DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+   serializationProxy.read(in);
+
+   return 
serializationProxy.getNamedStateSerializationProxies();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+   }
+   }
+
+   private void readStateData(
+   Path restoreFilePath,
+   StreamStateHandle remoteFileHandle) throws 
IOException {
+
+   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
+
+   FSDataInputStream inputStream = null;
+   FSDataOutputStream outputStream = null;
+
+   try {
+   inputStream = 
remoteFileHandle.openInputStream();
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   byte[] buffer = new byte[1024];
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   }
+   }
+   }
+
+   private void restoreInstance(
+   RocksDBKeyedStateHandle restoreStateHandle,
+   boolean hasExtraKeys) throws Exception {
+
+   // read state data
+   Path restoreInstancePath = new Path(
+   stateBackend.instanceBasePath.getAbsolutePath(),
+   UUID.randomUUID().toString());
+
+   try {
+   Map sstFiles = 
restoreStateHandle.getSstFiles();
+   for (Map.Entry 
sstFileEntry : sstFiles.entrySet()) {
+   String fileName = sstFileEntry.getKey();
+   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
+
+   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
+   }
+
+   Map miscFiles = 
restoreStateHandle.getM

[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996212#comment-15996212
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703775
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114703775
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
}
}
 
+   private static class RocksDBIncrementalSnapshotOperation {
+
+   private final RocksDBKeyedStateBackend stateBackend;
+
+   private final CheckpointStreamFactory checkpointStreamFactory;
+
+   private final long checkpointId;
+
+   private final long checkpointTimestamp;
+
+   private Map baseSstFiles;
+
+   private List> stateMetaInfos = new ArrayList<>();
+
+   private FileSystem backupFileSystem;
+   private Path backupPath;
+
+   private FSDataInputStream inputStream = null;
+   private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+   // new sst files since the last completed checkpoint
+   private Set newSstFileNames = new HashSet<>();
+
+   // handles to the sst files in the current snapshot
+   private Map sstFiles = new 
HashMap<>();
+
+   // handles to the misc files in the current snapshot
+   private Map miscFiles = new 
HashMap<>();
+
+   private StreamStateHandle metaStateHandle = null;
+
+   private RocksDBIncrementalSnapshotOperation(
+   RocksDBKeyedStateBackend stateBackend,
+   CheckpointStreamFactory checkpointStreamFactory,
+   long checkpointId,
+   long checkpointTimestamp) {
+
+   this.stateBackend = stateBackend;
+   this.checkpointStreamFactory = checkpointStreamFactory;
+   this.checkpointId = checkpointId;
+   this.checkpointTimestamp = checkpointTimestamp;
+   }
+
+   private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+   try {
+   final byte[] buffer = new byte[1024];
+
+   FileSystem backupFileSystem = 
backupPath.getFileSystem();
+   inputStream = backupFileSystem.open(filePath);
+   
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   while (true) {
+   int numBytes = inputStream.read(buffer);
+
+   if (numBytes == -1) {
+   break;
+   }
+
+   outputStream.write(buffer, 0, numBytes);
+   }
+
+   return outputStream.closeAndGetHandle();
+   } finally {
+   if (inputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+   inputStream.close();
+   inputStream = null;
+   }
+
+   if (outputStream != null) {
+   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+   outputStream.close();
+   outputStream = null;
+   }
+   }
+   }
+
+   private StreamStateHandle materializeMetaData() throws 
Exception {
+   try {
+   outputStream = checkpointStreamFactory
+   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+   
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+   KeyedBackendSerializationProxy 
serializationProxy =
+   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+   DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+
+   serializ

[jira] [Assigned] (FLINK-6439) Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()

2017-05-03 Thread Fang Yong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-6439:


Assignee: Fang Yong

> Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()
> --
>
> Key: FLINK-6439
> URL: https://issues.apache.org/jira/browse/FLINK-6439
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Fang Yong
>Priority: Minor
>
> {code}
> FileInputStream in = new FileInputStream(path);
> DataInputStream dis = new DataInputStream(in);
> {code}
> None of the in / dis is closed upon return from the method.
> In writeStateHandle(), OutputStream should be closed in finally block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3569: [flink-6036]Let catalog support partition

2017-05-03 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3569#discussion_r114697342
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -44,6 +43,8 @@ case class ExternalCatalogTable(
 properties: JMap[String, String] = new JHashMap(),
 stats: TableStats = null,
 comment: String = null,
+partitionColumnNames: JLinkedHashSet[String] = new JLinkedHashSet(),
+isPartitioned: Boolean = false,
--- End diff --

add comments for ``partitionColumnNames`` and ``isPartitioned``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6381) Unnecessary synchronizing object in BucketingSink

2017-05-03 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6381:

Description: 
It seems that currently there are two places should not employ the 
{{synchronized}} to describe {{pendingFilesPerCheckpoint}}, as it is only 
restored state object for checkpoint and no sharing of the data-structure 
between different threads. Codes in  {{BucketingSink}} are as follows. {code} 
private void handleRestoredRollingSinkState(RollingSink.BucketState 
restoredState) {
 ...
synchronized (restoredState.pendingFilesPerCheckpoint) {
restoredState.pendingFilesPerCheckpoint.clear();
}
 ...
}{code} and {code}private void handleRestoredBucketState(State 
restoredState) {   
...
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
}
 } {code}
Hi, [~kkl0u]. Is there any other stuff shoud add here ? Would you mind have a 
more thorough look in this class ? Thanks go out to you. I am very appreciate 
it.

  was:
It seems that currently there are two places should not employ the 
{{synchronized}} to describe {{pendingFilesPerCheckpoint}}, as it is only 
restored state object for checkpoint and no sharing of the data-structure 
between different threads. Codes in  {{BucketingSink}} are as follow. {code} 
private void handleRestoredRollingSinkState(RollingSink.BucketState 
restoredState) {
 ...
synchronized (restoredState.pendingFilesPerCheckpoint) {
restoredState.pendingFilesPerCheckpoint.clear();
}
 ...
}{code} and {code}private void handleRestoredBucketState(State 
restoredState) {   
...
synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.clear();
}
 } {code}
Hi, [~kkl0u]. Is there any other stuff shoud add here ? Would you mind have a 
more thorough look in this class ? Thanks go out to you. I am very appreciate 
it.


> Unnecessary synchronizing object in BucketingSink
> -
>
> Key: FLINK-6381
> URL: https://issues.apache.org/jira/browse/FLINK-6381
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> It seems that currently there are two places should not employ the 
> {{synchronized}} to describe {{pendingFilesPerCheckpoint}}, as it is only 
> restored state object for checkpoint and no sharing of the data-structure 
> between different threads. Codes in  {{BucketingSink}} are as follows. {code} 
> private void handleRestoredRollingSinkState(RollingSink.BucketState 
> restoredState) {
>  ...
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
>   }
>  ...
> }{code} and {code}private void handleRestoredBucketState(State 
> restoredState) {   
> ...
> synchronized (bucketState.pendingFilesPerCheckpoint) {
>   bucketState.pendingFilesPerCheckpoint.clear();
>   }
>  } {code}
> Hi, [~kkl0u]. Is there any other stuff shoud add here ? Would you mind have a 
> more thorough look in this class ? Thanks go out to you. I am very appreciate 
> it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6346) Migrate from Java serialization for GenericWriteAheadSink's state

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996133#comment-15996133
 ] 

ASF GitHub Bot commented on FLINK-6346:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3812
  
@zhangminglei @zentol thanks for your suggestions, and I have updated the 
code. 

As discussed in 
[https://github.com/apache/flink/pull/3750#issuecomment-298869900](url), it may 
not need to register two state like the current way, and older version of the 
data could be converted in some way, such as upgrade tools. I look forward to 
hearing your thoughts after you think about it later. @tzulitai 


> Migrate from Java serialization for GenericWriteAheadSink's state
> -
>
> Key: FLINK-6346
> URL: https://issues.apache.org/jira/browse/FLINK-6346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration 
> for {{GenericWriteAheadSink}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3812: [FLINK-6346] Migrate from Java serialization for GenericW...

2017-05-03 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3812
  
@zhangminglei @zentol thanks for your suggestions, and I have updated the 
code. 

As discussed in 
[https://github.com/apache/flink/pull/3750#issuecomment-298869900](url), it may 
not need to register two state like the current way, and older version of the 
data could be converted in some way, such as upgrade tools. I look forward to 
hearing your thoughts after you think about it later. @tzulitai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3569: [flink-6036]Let catalog support partition

2017-05-03 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3569#discussion_r114695239
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{LinkedHashMap => JLinkedHashMap, Map => JMap, HashMap => 
JHashMap}
+
+import org.apache.flink.table.plan.stats.TablePartitionStats
+
+object ExternalCatalogTypes {
+
+  /**
+* external table partition specification.
+* Key is partition column name, value is partition column value.
+*/
+  type PartitionSpec = JLinkedHashMap[String, String]
+}
+
+/**
+  * Partition definition of an external Catalog table
+  *
+  * @param partitionSpec partition specification
+  * @param propertiespartition properties
+  * @param stats partition statistics
+  */
+case class ExternalCatalogTablePartition(
--- End diff --

this file is same as 
flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala,
 remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3810: [Flink-6397] MultipleProgramsTestBase does not res...

2017-05-03 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/3810#discussion_r114693043
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 ---
@@ -80,29 +80,36 @@

protected final TestExecutionMode mode;
 
-   
+   private TestEnvironment testEnvironment;
+
+   private CollectionTestEnvironment collectionTestEnvironment;
+
public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;
-   
+
switch(mode){
case CLUSTER:
-   new TestEnvironment(cluster, 4).setAsContext();
+   testEnvironment = new TestEnvironment(cluster, 
4);
--- End diff --

Sounds reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3810: [Flink-6397] MultipleProgramsTestBase does not reset Cont...

2017-05-03 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/3810
  
@StephanEwen 
I agree that a static unset method would be a much easier implementation. 
Do you think it's acceptable that the unset method is static but set method is 
not static in TestEnvironment? Or we can implement the set method as static 
too, but that will make more changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996078#comment-15996078
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114692346
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.AggregateFunction
+import 
org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker,
 createOperandTypeInference, createReturnTypeInference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Calcite wrapper for user-defined aggregate functions.
+  *
+  * @param name function name (used by SQL parser)
+  * @param aggregateFunction aggregate function to be called
+  * @param returnType the type information of returned value
+  * @param typeFactory type factory for converting Flink's between 
Calcite's types
+  */
+class AggSqlFunction(
+name: String,
+aggregateFunction: AggregateFunction[_, _],
+returnType: TypeInformation[_],
+typeFactory: FlinkTypeFactory)
+  extends SqlUserDefinedAggFunction(
+new SqlIdentifier(name, SqlParserPos.ZERO),
+createReturnTypeInference(returnType, typeFactory),
+createOperandTypeInference(aggregateFunction, typeFactory),
+createOperandTypeChecker(aggregateFunction),
+// Do not need to provide a calcite aggregateFunction here. Flink 
aggregateion function
+// will be generated when translating the calcite relnode to flink 
runtime execution plan
+null
+  ) {
+
+  def getFunction: AggregateFunction[_, _] = aggregateFunction
+}
+
+object AggSqlFunction {
+
+  def apply(
+  name: String,
+  aggregateFunction: AggregateFunction[_, _],
+  returnType: TypeInformation[_],
+  typeFactory: FlinkTypeFactory): AggSqlFunction = {
+
+new AggSqlFunction(name, aggregateFunction, returnType, typeFactory)
+  }
+
+  private[flink] def createOperandTypeInference(
+  aggregateFunction: AggregateFunction[_, _],
+  typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+/**
+  * Operand type inference based on [[AggregateFunction]] given 
information.
+  */
+new SqlOperandTypeInference {
+  override def inferOperandTypes(
+  callBinding: SqlCallBinding,
+  returnType: RelDataType,
+  operandTypes: Array[RelDataType]): Unit = {
+
+val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+val foundSignature = 
getAccumulateMethodSignature(aggregateFunction, operandTypeInfo)
+  .getOrElse(throw new ValidationException(s"Operand types of 
could not be inferred."))
+
+val inferredTypes = getParameterTypes(aggregateFunction, 
foundSignature.drop(1))
+  .map(typeFactory.createTypeFromTypeInfo)
+
+for (i <- operandTypes.indices) {
+  if (i < inferredTypes.length - 1) {
+operandTypes(i) = inferredTypes(i)
+  } else if (null != inferredTypes.last.getComponentType) {
--- End diff --


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996076#comment-15996076
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114692276
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.AggregateFunction
+import 
org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker,
 createOperandTypeInference, createReturnTypeInference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Calcite wrapper for user-defined aggregate functions.
+  *
+  * @param name function name (used by SQL parser)
+  * @param aggregateFunction aggregate function to be called
+  * @param returnType the type information of returned value
+  * @param typeFactory type factory for converting Flink's between 
Calcite's types
+  */
+class AggSqlFunction(
+name: String,
+aggregateFunction: AggregateFunction[_, _],
+returnType: TypeInformation[_],
+typeFactory: FlinkTypeFactory)
+  extends SqlUserDefinedAggFunction(
+new SqlIdentifier(name, SqlParserPos.ZERO),
+createReturnTypeInference(returnType, typeFactory),
+createOperandTypeInference(aggregateFunction, typeFactory),
+createOperandTypeChecker(aggregateFunction),
+// Do not need to provide a calcite aggregateFunction here. Flink 
aggregateion function
+// will be generated when translating the calcite relnode to flink 
runtime execution plan
+null
+  ) {
+
+  def getFunction: AggregateFunction[_, _] = aggregateFunction
+}
+
+object AggSqlFunction {
+
+  def apply(
+  name: String,
+  aggregateFunction: AggregateFunction[_, _],
+  returnType: TypeInformation[_],
+  typeFactory: FlinkTypeFactory): AggSqlFunction = {
+
+new AggSqlFunction(name, aggregateFunction, returnType, typeFactory)
+  }
+
+  private[flink] def createOperandTypeInference(
+  aggregateFunction: AggregateFunction[_, _],
+  typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+/**
+  * Operand type inference based on [[AggregateFunction]] given 
information.
+  */
+new SqlOperandTypeInference {
+  override def inferOperandTypes(
+  callBinding: SqlCallBinding,
+  returnType: RelDataType,
+  operandTypes: Array[RelDataType]): Unit = {
+
+val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+val foundSignature = 
getAccumulateMethodSignature(aggregateFunction, operandTypeInfo)
+  .getOrElse(throw new ValidationException(s"Operand types of 
could not be inferred."))
+
+val inferredTypes = getParameterTypes(aggregateFunction, 
foundSignature.drop(1))
+  .map(typeFactory.createTypeFromTypeInfo)
+
+for (i <- operandTypes.indices) {
+  if (i < inferredTypes.length - 1) {
+operandTypes(i) = inferredTypes(i)
+  } else if (null != inferredTypes.last.getComponentType) {
+// last argu

[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114692346
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.AggregateFunction
+import 
org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker,
 createOperandTypeInference, createReturnTypeInference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Calcite wrapper for user-defined aggregate functions.
+  *
+  * @param name function name (used by SQL parser)
+  * @param aggregateFunction aggregate function to be called
+  * @param returnType the type information of returned value
+  * @param typeFactory type factory for converting Flink's between 
Calcite's types
+  */
+class AggSqlFunction(
+name: String,
+aggregateFunction: AggregateFunction[_, _],
+returnType: TypeInformation[_],
+typeFactory: FlinkTypeFactory)
+  extends SqlUserDefinedAggFunction(
+new SqlIdentifier(name, SqlParserPos.ZERO),
+createReturnTypeInference(returnType, typeFactory),
+createOperandTypeInference(aggregateFunction, typeFactory),
+createOperandTypeChecker(aggregateFunction),
+// Do not need to provide a calcite aggregateFunction here. Flink 
aggregateion function
+// will be generated when translating the calcite relnode to flink 
runtime execution plan
+null
+  ) {
+
+  def getFunction: AggregateFunction[_, _] = aggregateFunction
+}
+
+object AggSqlFunction {
+
+  def apply(
+  name: String,
+  aggregateFunction: AggregateFunction[_, _],
+  returnType: TypeInformation[_],
+  typeFactory: FlinkTypeFactory): AggSqlFunction = {
+
+new AggSqlFunction(name, aggregateFunction, returnType, typeFactory)
+  }
+
+  private[flink] def createOperandTypeInference(
+  aggregateFunction: AggregateFunction[_, _],
+  typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+/**
+  * Operand type inference based on [[AggregateFunction]] given 
information.
+  */
+new SqlOperandTypeInference {
+  override def inferOperandTypes(
+  callBinding: SqlCallBinding,
+  returnType: RelDataType,
+  operandTypes: Array[RelDataType]): Unit = {
+
+val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+val foundSignature = 
getAccumulateMethodSignature(aggregateFunction, operandTypeInfo)
+  .getOrElse(throw new ValidationException(s"Operand types of 
could not be inferred."))
+
+val inferredTypes = getParameterTypes(aggregateFunction, 
foundSignature.drop(1))
+  .map(typeFactory.createTypeFromTypeInfo)
+
+for (i <- operandTypes.indices) {
+  if (i < inferredTypes.length - 1) {
+operandTypes(i) = inferredTypes(i)
+  } else if (null != inferredTypes.last.getComponentType) {
--- End diff --

Yes, if the last parameter is a component, say Array[Int]. We want to get 
the type of Int, not the type of Array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not ha

[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114692276
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.AggregateFunction
+import 
org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker,
 createOperandTypeInference, createReturnTypeInference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Calcite wrapper for user-defined aggregate functions.
+  *
+  * @param name function name (used by SQL parser)
+  * @param aggregateFunction aggregate function to be called
+  * @param returnType the type information of returned value
+  * @param typeFactory type factory for converting Flink's between 
Calcite's types
+  */
+class AggSqlFunction(
+name: String,
+aggregateFunction: AggregateFunction[_, _],
+returnType: TypeInformation[_],
+typeFactory: FlinkTypeFactory)
+  extends SqlUserDefinedAggFunction(
+new SqlIdentifier(name, SqlParserPos.ZERO),
+createReturnTypeInference(returnType, typeFactory),
+createOperandTypeInference(aggregateFunction, typeFactory),
+createOperandTypeChecker(aggregateFunction),
+// Do not need to provide a calcite aggregateFunction here. Flink 
aggregateion function
+// will be generated when translating the calcite relnode to flink 
runtime execution plan
+null
+  ) {
+
+  def getFunction: AggregateFunction[_, _] = aggregateFunction
+}
+
+object AggSqlFunction {
+
+  def apply(
+  name: String,
+  aggregateFunction: AggregateFunction[_, _],
+  returnType: TypeInformation[_],
+  typeFactory: FlinkTypeFactory): AggSqlFunction = {
+
+new AggSqlFunction(name, aggregateFunction, returnType, typeFactory)
+  }
+
+  private[flink] def createOperandTypeInference(
+  aggregateFunction: AggregateFunction[_, _],
+  typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+/**
+  * Operand type inference based on [[AggregateFunction]] given 
information.
+  */
+new SqlOperandTypeInference {
+  override def inferOperandTypes(
+  callBinding: SqlCallBinding,
+  returnType: RelDataType,
+  operandTypes: Array[RelDataType]): Unit = {
+
+val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+val foundSignature = 
getAccumulateMethodSignature(aggregateFunction, operandTypeInfo)
+  .getOrElse(throw new ValidationException(s"Operand types of 
could not be inferred."))
+
+val inferredTypes = getParameterTypes(aggregateFunction, 
foundSignature.drop(1))
+  .map(typeFactory.createTypeFromTypeInfo)
+
+for (i <- operandTypes.indices) {
+  if (i < inferredTypes.length - 1) {
+operandTypes(i) = inferredTypes(i)
+  } else if (null != inferredTypes.last.getComponentType) {
+// last argument is a collection, the array type
+operandTypes(i) = inferredTypes.last.getComponentType
+  } else {
+operandTypes(i) = inferredTypes.last
+  }
+}
+  }
+}
+  }
+

[jira] [Updated] (FLINK-6441) Improve the UDTF

2017-05-03 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-6441:
--
Description: According to [FLINK-6334], UDTF's apply method return a 
unbounded Table which consists of a LogicalTableFunctionCall, and only 
supported Alias transformation, this issue is focus on adding evaluating in 
Select, e.g table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 
'b))  (was: According to [FLINK-6334] 
https://issues.apache.org/jira/browse/FLINK-6334, UDTF's apply method return a 
unbounded Table which consists of a LogicalTableFunctionCall, and only 
supported Alias transformation, this issue is focus on adding evaluating in 
Select, e.g table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 
'b)))

> Improve the UDTF
> 
>
> Key: FLINK-6441
> URL: https://issues.apache.org/jira/browse/FLINK-6441
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> According to [FLINK-6334], UDTF's apply method return a unbounded Table which 
> consists of a LogicalTableFunctionCall, and only supported Alias 
> transformation, this issue is focus on adding evaluating in Select, e.g 
> table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b))



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6441) Improve the UDTF

2017-05-03 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-6441:
--
Description: According to [FLINK-6334] 
https://issues.apache.org/jira/browse/FLINK-6334, UDTF's apply method return a 
unbounded Table which consists of a LogicalTableFunctionCall, and only 
supported Alias transformation, this issue is focus on adding evaluating in 
Select, e.g table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b))

> Improve the UDTF
> 
>
> Key: FLINK-6441
> URL: https://issues.apache.org/jira/browse/FLINK-6441
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> According to [FLINK-6334] https://issues.apache.org/jira/browse/FLINK-6334, 
> UDTF's apply method return a unbounded Table which consists of a 
> LogicalTableFunctionCall, and only supported Alias transformation, this issue 
> is focus on adding evaluating in Select, e.g table.join(split('c) as ('a, b) 
> select ('a * 2 as 'a, 'b + 1 as 'b))



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6441) Improve the UDTF

2017-05-03 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-6441:
-

 Summary: Improve the UDTF
 Key: FLINK-6441
 URL: https://issues.apache.org/jira/browse/FLINK-6441
 Project: Flink
  Issue Type: Improvement
Reporter: Ruidong Li
Assignee: Ruidong Li






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996055#comment-15996055
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114690845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation {
 new SqlAvgAggFunction(AVG)
   }
 }
+
+case class UDAGGFunctionCall(
+aggregateFunction: AggregateFunction[_, _],
+args: Seq[Expression])
+  extends Aggregation {
+
+  override private[flink] def children: Seq[Expression] = args
+
+  // Override makeCopy method in TreeNode, to produce vargars properly
+  override def makeCopy(args: Array[AnyRef]): this.type = {
+if (args.length < 1) {
+  throw new TableException("Invalid constructor params")
+}
+val agg = args.head.asInstanceOf[AggregateFunction[_, _]]
+val arg = args.last.asInstanceOf[Seq[Expression]]
+new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type]
+  }
+
+  override def resultType: TypeInformation[_] = 
TypeExtractor.createTypeInfo(
+aggregateFunction, classOf[AggregateFunction[_, _]], 
aggregateFunction.getClass, 0)
+
+  override def validateInput(): ValidationResult = {
+val signature = children.map(_.resultType)
+// look for a signature that matches the input types
+val foundSignature = getAccumulateMethodSignature(aggregateFunction, 
signature)
+if (foundSignature.isEmpty) {
+  ValidationFailure(s"Given parameters do not match any signature. \n" 
+
+  s"Actual: ${signatureToString(signature)} \n" +
+  s"Expected: 
${signaturesToString(aggregateFunction, "accumulate")}")
+} else {
+  ValidationSuccess
+}
+  }
+
+  override def toString(): String = 
s"${aggregateFunction.getClass.getSimpleName}($args)"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
+val typeFactory = 
relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+val sqlFunction = AggSqlFunction(name, aggregateFunction, resultType, 
typeFactory)
--- End diff --

I was trying to keep this name consistent with `ScalaSqlFunction` and 
`TableSqlFunction`


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114690845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation {
 new SqlAvgAggFunction(AVG)
   }
 }
+
+case class UDAGGFunctionCall(
+aggregateFunction: AggregateFunction[_, _],
+args: Seq[Expression])
+  extends Aggregation {
+
+  override private[flink] def children: Seq[Expression] = args
+
+  // Override makeCopy method in TreeNode, to produce vargars properly
+  override def makeCopy(args: Array[AnyRef]): this.type = {
+if (args.length < 1) {
+  throw new TableException("Invalid constructor params")
+}
+val agg = args.head.asInstanceOf[AggregateFunction[_, _]]
+val arg = args.last.asInstanceOf[Seq[Expression]]
+new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type]
+  }
+
+  override def resultType: TypeInformation[_] = 
TypeExtractor.createTypeInfo(
+aggregateFunction, classOf[AggregateFunction[_, _]], 
aggregateFunction.getClass, 0)
+
+  override def validateInput(): ValidationResult = {
+val signature = children.map(_.resultType)
+// look for a signature that matches the input types
+val foundSignature = getAccumulateMethodSignature(aggregateFunction, 
signature)
+if (foundSignature.isEmpty) {
+  ValidationFailure(s"Given parameters do not match any signature. \n" 
+
+  s"Actual: ${signatureToString(signature)} \n" +
+  s"Expected: 
${signaturesToString(aggregateFunction, "accumulate")}")
+} else {
+  ValidationSuccess
+}
+  }
+
+  override def toString(): String = 
s"${aggregateFunction.getClass.getSimpleName}($args)"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
+val typeFactory = 
relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+val sqlFunction = AggSqlFunction(name, aggregateFunction, resultType, 
typeFactory)
--- End diff --

I was trying to keep this name consistent with `ScalaSqlFunction` and 
`TableSqlFunction`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3818: [FLINK-5514] [table] Implement an efficient physic...

2017-05-03 Thread godfreyhe
GitHub user godfreyhe opened a pull request:

https://github.com/apache/flink/pull/3818

[FLINK-5514] [table] Implement an efficient physical execution for 
CUBE/ROLLUP/GROUPING SETS



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/godfreyhe/flink FLINK-5514

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3818


commit fa876e41845b4d03bd896349e7a3539d278ccfdc
Author: godfreyhe 
Date:   2017-05-04T02:08:04Z

[FLINK-5514] [table] Implement an efficient physical execution for 
CUBE/ROLLUP/GROUPING SETS




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5514) Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996041#comment-15996041
 ] 

ASF GitHub Bot commented on FLINK-5514:
---

GitHub user godfreyhe opened a pull request:

https://github.com/apache/flink/pull/3818

[FLINK-5514] [table] Implement an efficient physical execution for 
CUBE/ROLLUP/GROUPING SETS



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/godfreyhe/flink FLINK-5514

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3818


commit fa876e41845b4d03bd896349e7a3539d278ccfdc
Author: godfreyhe 
Date:   2017-05-04T02:08:04Z

[FLINK-5514] [table] Implement an efficient physical execution for 
CUBE/ROLLUP/GROUPING SETS




> Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS
> ---
>
> Key: FLINK-5514
> URL: https://issues.apache.org/jira/browse/FLINK-5514
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: godfrey he
>
> A first support for GROUPING SETS has been added in FLINK-5303. However, the 
> current runtime implementation is not very efficient as it basically only 
> translates logical operators to physical operators i.e. grouping sets are 
> currently only translated into multiple groupings that are unioned together. 
> A rough design document for this has been created in FLINK-2980.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6367) support custom header settings of allow origin

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996039#comment-15996039
 ] 

ASF GitHub Bot commented on FLINK-6367:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3769
  
hi, @zentol @StephanEwen It didn't merge in. @zentol Please check again.


> support custom header settings of allow origin
> --
>
> Key: FLINK-6367
> URL: https://issues.apache.org/jira/browse/FLINK-6367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> `jobmanager.web.access-control-allow-origin`: Enable custom access control 
> parameter for allow origin header, default is `*`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...

2017-05-03 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3769
  
hi, @zentol @StephanEwen It didn't merge in. @zentol Please check again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3759: [FLINK-6366] close KafkaConsumer in FlinkKafkaCons...

2017-05-03 Thread fanyon
Github user fanyon closed the pull request at:

https://github.com/apache/flink/pull/3759


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6366) KafkaConsumer is not closed in FlinkKafkaConsumer09

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996036#comment-15996036
 ] 

ASF GitHub Bot commented on FLINK-6366:
---

Github user fanyon closed the pull request at:

https://github.com/apache/flink/pull/3759


> KafkaConsumer is not closed in FlinkKafkaConsumer09
> ---
>
> Key: FLINK-6366
> URL: https://issues.apache.org/jira/browse/FLINK-6366
> Project: Flink
>  Issue Type: Bug
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> In getKafkaPartitions of FlinkKafkaConsumer09, the KafkaConsumer is created 
> as flowers and will not be closed.
> {code:title=FlinkKafkaConsumer09.java|borderStyle=solid}
> protected List getKafkaPartitions(List topics) {
> // read the partitions that belong to the listed topics
> final List partitions = new ArrayList<>();
> try (KafkaConsumer consumer = new 
> KafkaConsumer<>(this.properties)) {
> for (final String topic: topics) {
> // get partitions for each topic
> List partitionsForTopic = 
> consumer.partitionsFor(topic);
> ...
> }
> }
> ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5514) Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS

2017-05-03 Thread godfrey he (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-5514:
-

Assignee: godfrey he

> Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS
> ---
>
> Key: FLINK-5514
> URL: https://issues.apache.org/jira/browse/FLINK-5514
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: godfrey he
>
> A first support for GROUPING SETS has been added in FLINK-5303. However, the 
> current runtime implementation is not very efficient as it basically only 
> translates logical operators to physical operators i.e. grouping sets are 
> currently only translated into multiple groupings that are unioned together. 
> A rough design document for this has been created in FLINK-2980.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996018#comment-15996018
 ] 

ASF GitHub Bot commented on FLINK-6275:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3724
  
@StephanEwen Thanks. I might close this PR as there is a refactor to 
```YarnFlinkApplicationMasterRunner ``` by combining with 
```AbstractYarnFlinkApplicationMasterRunner``` in one class. See : 
https://issues.apache.org/jira/browse/FLINK-6351


> Unprotected access to resourceManager in 
> YarnFlinkApplicationMasterRunner#runApplicationMaster
> --
>
> Key: FLINK-6275
> URL: https://issues.apache.org/jira/browse/FLINK-6275
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   // wait for resource manager to finish
>   resourceManager.getTerminationFuture().get();
> {code}
> The above is outside synchronized block.
> @GuardedBy indicates that access should be protected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3724: [FLINK-6275] [yarn] Fix unprotected access to resourceMan...

2017-05-03 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3724
  
@StephanEwen Thanks. I might close this PR as there is a refactor to 
```YarnFlinkApplicationMasterRunner ``` by combining with 
```AbstractYarnFlinkApplicationMasterRunner``` in one class. See : 
https://issues.apache.org/jira/browse/FLINK-6351


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995971#comment-15995971
 ] 

ASF GitHub Bot commented on FLINK-6428:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3817

[FLINK-6428][table] Add support DISTINCT in dataStream SQL

With the changes of this PR. `DISTINCT` keyword can be supported `SELECT 
Clause`  of dataStream SQL.

In standard database there are two situations can using `DISTINCT` keyword.
* in `SELECT Clause`, e.g.: `SELECT DISTINCT name FROM table`
* in `AGG Clause`, e.g.: `COUNT([ALL|DISTINCT] 
expression)`,`SUM([ALL|DISTINCT] expression)`, etc.

In this JIRA. we talk about `SELECT Clause`. With the growing elements, the 
limitations tend to be back-end storage(flink state). In theory, external 
storage is infinitely large (user can control and expect), this point of view, 
the infinite STREAM of the DISTINCT can be supported.In addition, external 
storage, for example: RocksDB, the user can set the TTL according to the actual 
amount of business data to ensure that external storage is working properly.

- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6428][table] Add support DISTINCT in dataStream SQL")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6428-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3817


commit f1aaffd765f05f8b51801ae1bb66426d99480970
Author: sunjincheng121 
Date:   2017-05-03T03:47:09Z

[FLINK-6428][table] Add support DISTINCT in dataStream SQL




> Add support DISTINCT in dataStream SQL
> --
>
> Key: FLINK-6428
> URL: https://issues.apache.org/jira/browse/FLINK-6428
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add support DISTINCT in dataStream SQL as follow:
> DATA:
> {code}
> (name, age)
> (kevin, 28),
> (sunny, 6),
> (jack, 6)
> {code}
> SQL:
> {code}
> SELECT DISTINCT age FROM MyTable"
> {code}
> RESULTS:
> {code}
> 28, 6
> {code}
> To DataStream:
> {code}
> inputDS
>   .keyBy() // KeyBy on all fields
>   .flatMap() //  Eliminate duplicate data
> {code}
> [~fhueske] do we need this feature?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3817: [FLINK-6428][table] Add support DISTINCT in dataSt...

2017-05-03 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3817

[FLINK-6428][table] Add support DISTINCT in dataStream SQL

With the changes of this PR. `DISTINCT` keyword can be supported `SELECT 
Clause`  of dataStream SQL.

In standard database there are two situations can using `DISTINCT` keyword.
* in `SELECT Clause`, e.g.: `SELECT DISTINCT name FROM table`
* in `AGG Clause`, e.g.: `COUNT([ALL|DISTINCT] 
expression)`,`SUM([ALL|DISTINCT] expression)`, etc.

In this JIRA. we talk about `SELECT Clause`. With the growing elements, the 
limitations tend to be back-end storage(flink state). In theory, external 
storage is infinitely large (user can control and expect), this point of view, 
the infinite STREAM of the DISTINCT can be supported.In addition, external 
storage, for example: RocksDB, the user can set the TTL according to the actual 
amount of business data to ensure that external storage is working properly.

- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6428][table] Add support DISTINCT in dataStream SQL")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6428-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3817


commit f1aaffd765f05f8b51801ae1bb66426d99480970
Author: sunjincheng121 
Date:   2017-05-03T03:47:09Z

[FLINK-6428][table] Add support DISTINCT in dataStream SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-03 Thread Bill Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995858#comment-15995858
 ] 

Bill Liu commented on FLINK-4022:
-

any update on this feature? will it be released in 1.3 ?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995668#comment-15995668
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114651484
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation {
 new SqlAvgAggFunction(AVG)
   }
 }
+
+case class UDAGGFunctionCall(
+aggregateFunction: AggregateFunction[_, _],
+args: Seq[Expression])
+  extends Aggregation {
+
+  override private[flink] def children: Seq[Expression] = args
+
+  // Override makeCopy method in TreeNode, to produce vargars properly
+  override def makeCopy(args: Array[AnyRef]): this.type = {
+if (args.length < 1) {
+  throw new TableException("Invalid constructor params")
+}
+val agg = args.head.asInstanceOf[AggregateFunction[_, _]]
+val arg = args.last.asInstanceOf[Seq[Expression]]
+new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type]
+  }
+
+  override def resultType: TypeInformation[_] = 
TypeExtractor.createTypeInfo(
+aggregateFunction, classOf[AggregateFunction[_, _]], 
aggregateFunction.getClass, 0)
+
+  override def validateInput(): ValidationResult = {
+val signature = children.map(_.resultType)
+// look for a signature that matches the input types
+val foundSignature = getAccumulateMethodSignature(aggregateFunction, 
signature)
+if (foundSignature.isEmpty) {
+  ValidationFailure(s"Given parameters do not match any signature. \n" 
+
+  s"Actual: ${signatureToString(signature)} \n" +
+  s"Expected: 
${signaturesToString(aggregateFunction, "accumulate")}")
--- End diff --

The signature string includes the Accumulator, which should be removed.


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114651484
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation {
 new SqlAvgAggFunction(AVG)
   }
 }
+
+case class UDAGGFunctionCall(
+aggregateFunction: AggregateFunction[_, _],
+args: Seq[Expression])
+  extends Aggregation {
+
+  override private[flink] def children: Seq[Expression] = args
+
+  // Override makeCopy method in TreeNode, to produce vargars properly
+  override def makeCopy(args: Array[AnyRef]): this.type = {
+if (args.length < 1) {
+  throw new TableException("Invalid constructor params")
+}
+val agg = args.head.asInstanceOf[AggregateFunction[_, _]]
+val arg = args.last.asInstanceOf[Seq[Expression]]
+new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type]
+  }
+
+  override def resultType: TypeInformation[_] = 
TypeExtractor.createTypeInfo(
+aggregateFunction, classOf[AggregateFunction[_, _]], 
aggregateFunction.getClass, 0)
+
+  override def validateInput(): ValidationResult = {
+val signature = children.map(_.resultType)
+// look for a signature that matches the input types
+val foundSignature = getAccumulateMethodSignature(aggregateFunction, 
signature)
+if (foundSignature.isEmpty) {
+  ValidationFailure(s"Given parameters do not match any signature. \n" 
+
+  s"Actual: ${signatureToString(signature)} \n" +
+  s"Expected: 
${signaturesToString(aggregateFunction, "accumulate")}")
--- End diff --

The signature string includes the Accumulator, which should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6334.

Resolution: Fixed

Fixed for 1.3 with c969237fce4fe5394e1cfdbb1186db6d73d0

> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995546#comment-15995546
 ] 

ASF GitHub Bot commented on FLINK-6334:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3791


> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

2017-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3791


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6440) Noisy logs from metric fetcher

2017-05-03 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-6440:
---

 Summary: Noisy logs from metric fetcher
 Key: FLINK-6440
 URL: https://issues.apache.org/jira/browse/FLINK-6440
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.3.0
Reporter: Stephan Ewen
Priority: Critical
 Fix For: 1.3.0


In cases where TaskManagers fail, the web frontend in the Job Manager starts 
logging the exception below every few seconds.

I labeled this as critical, because it actually makes debugging in such a 
situation complicated through a log that is flooded with noise.

{code}
2017-05-03 19:37:07,823 WARN  
org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
metrics failed.
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@herman:52175/user/MetricQueryService_136f717a6b91e248282cb2937d22088c]]
 after [1 ms]
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995253#comment-15995253
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114602957
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

Well, in the cases here, we are not really interested in the 
TypeInformation itself but the type class. So it would not really matter if you 
get Row.class from the `GenericTypeInfo` or the `RowTypeInfo`. Nonetheless, I 
think it is a good idea to have this optional method. It would also be 
consistent with the other user-defined functions.


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114602957
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

Well, in the cases here, we are not really interested in the 
TypeInformation itself but the type class. So it would not really matter if you 
get Row.class from the `GenericTypeInfo` or the `RowTypeInfo`. Nonetheless, I 
think it is a good idea to have this optional method. It would also be 
consistent with the other user-defined functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995244#comment-15995244
 ] 

ASF GitHub Bot commented on FLINK-6334:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114590134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -93,6 +103,11 @@ class Table(
 * }}}
 */
   def select(fields: Expression*): Table = {
+if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
--- End diff --

I think we can add these checks without touching all methods of `Table`.
We could implement a method that recursively traverses a `LogicalNode` and 
checks if one of this children is an unbounded table function call. This check 
is performed in the constructor of Table and throws an exception except, the 
`logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it 
was created with the new  constructor or `as()` was applied on it.

That way we can remove all checks in the methods.


> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995241#comment-15995241
 ] 

ASF GitHub Bot commented on FLINK-6334:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114558126
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +360,45 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* this method is used for create a [[LogicalTableFunctionCall]]
+* @param tableEnv
+* @param udtf a String represent a TableFunction Call e.g "split(c)"
+* @return
+*/
+  def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) 
= {
--- End diff --

Add return type to method


> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995242#comment-15995242
 ] 

ASF GitHub Bot commented on FLINK-6334:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114553215
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -49,11 +49,11 @@ import 
org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, 
ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
-import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Alias, Call, Expression, 
ExpressionParser, TableFunctionCall, UnresolvedFieldReference}
--- End diff --

Remove the unnecessary imports


> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995243#comment-15995243
 ] 

ASF GitHub Bot commented on FLINK-6334:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114556553
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
 ---
@@ -35,6 +36,10 @@ class TableConversions(table: Table) {
   /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
   def toDataSet[T: TypeInformation]: DataSet[T] = {
 
+if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) {
--- End diff --

We should put this check into `Table.getRelNode`. This will catch all cases 
when we try to translate a table.


> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6334) Refactoring UDTF interface

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995245#comment-15995245
 ] 

ASF GitHub Bot commented on FLINK-6334:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114590580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -417,13 +507,45 @@ class Table(
   }
 
   private def join(right: Table, joinPredicate: Option[Expression], 
joinType: JoinType): Table = {
-// check that right table belongs to the same TableEnvironment
-if (right.tableEnv != this.tableEnv) {
+if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
+  throw new ValidationException(
+"TableFunctions can only be followed by Alias. e.g 
table.join(split('c) as ('a, 'b))")
+}
+
+// check that the TableEnvironment of right table is not null
+// and right table belongs to the same TableEnvironment
+if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
--- End diff --

I would change this method as follows:
first we check if `right` is a table function call. If no, we translate the 
join as before. Otherwise, we translate it as a join with a table function.


> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114556553
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
 ---
@@ -35,6 +36,10 @@ class TableConversions(table: Table) {
   /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
   def toDataSet[T: TypeInformation]: DataSet[T] = {
 
+if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) {
--- End diff --

We should put this check into `Table.getRelNode`. This will catch all cases 
when we try to translate a table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114553215
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -49,11 +49,11 @@ import 
org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, 
ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
-import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Alias, Call, Expression, 
ExpressionParser, TableFunctionCall, UnresolvedFieldReference}
--- End diff --

Remove the unnecessary imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114558126
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -358,4 +360,45 @@ object UserDefinedFunctionUtils {
 InstantiationUtil
   .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
   }
+
+  /**
+* this method is used for create a [[LogicalTableFunctionCall]]
+* @param tableEnv
+* @param udtf a String represent a TableFunction Call e.g "split(c)"
+* @return
+*/
+  def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) 
= {
--- End diff --

Add return type to method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114590580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -417,13 +507,45 @@ class Table(
   }
 
   private def join(right: Table, joinPredicate: Option[Expression], 
joinType: JoinType): Table = {
-// check that right table belongs to the same TableEnvironment
-if (right.tableEnv != this.tableEnv) {
+if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
+  throw new ValidationException(
+"TableFunctions can only be followed by Alias. e.g 
table.join(split('c) as ('a, 'b))")
+}
+
+// check that the TableEnvironment of right table is not null
+// and right table belongs to the same TableEnvironment
+if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
--- End diff --

I would change this method as follows:
first we check if `right` is a table function call. If no, we translate the 
join as before. Otherwise, we translate it as a join with a table function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3791#discussion_r114590134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -93,6 +103,11 @@ class Table(
 * }}}
 */
   def select(fields: Expression*): Table = {
+if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
--- End diff --

I think we can add these checks without touching all methods of `Table`.
We could implement a method that recursively traverses a `LogicalNode` and 
checks if one of this children is an unbounded table function call. This check 
is performed in the constructor of Table and throws an exception except, the 
`logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it 
was created with the new  constructor or `as()` was applied on it.

That way we can remove all checks in the methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3793: flink-6033 Support UNNEST query in the stream SQL API

2017-05-03 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3793
  
Thanks for the PR @suez1224. I think we need to rework this PR a little 
bit, before we can merge it. I saw that you had problems using basic type 
arrays such as `Integer[]`. I think we should cover this in a separate issue, 
because adding a type involves changes in multiple files and needs good tests. 

I would propose to introduce `LogicalUnnestRule` that convert the logical 
plan into a logical `FlinkLogicalCorrelate`. This approach would not require a 
`DataStreamCorrelateUnnestRule` and would work for both batch and streaming.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995194#comment-15995194
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r114595396
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Enclosed.class)
+public class DatadogHttpClientTest {
+   public static class TestApiKey {
+   @Test(expected = IllegalArgumentException.class)
+   public void testValidateApiKey() {
+   new DatadogHttpClient("fake_key");
--- End diff --

Make sense. There're lots of uncertainty why testing integration with 3rd 
party - you don't want to depend to much on it, but you have to test it 
somehow. It's hard to find a balance somewhere in between.

I'll make changes as you recommended.


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-05-03 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r114595396
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Enclosed.class)
+public class DatadogHttpClientTest {
+   public static class TestApiKey {
+   @Test(expected = IllegalArgumentException.class)
+   public void testValidateApiKey() {
+   new DatadogHttpClient("fake_key");
--- End diff --

Make sense. There're lots of uncertainty why testing integration with 3rd 
party - you don't want to depend to much on it, but you have to test it 
somehow. It's hard to find a balance somewhere in between.

I'll make changes as you recommended.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995187#comment-15995187
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114593800
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

Yes, the Row type is a good example for type extraction problems. A user 
usually doesn't want to use `GenericType`. Other cases need for custom 
serializers or types with complex generics.


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114593800
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

Yes, the Row type is a good example for type extraction problems. A user 
usually doesn't want to use `GenericType`. Other cases need for custom 
serializers or types with complex generics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6439) Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()

2017-05-03 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6439:
-

 Summary: Unclosed InputStream in 
OperatorSnapshotUtil#readStateHandle()
 Key: FLINK-6439
 URL: https://issues.apache.org/jira/browse/FLINK-6439
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
FileInputStream in = new FileInputStream(path);
DataInputStream dis = new DataInputStream(in);
{code}
None of the in / dis is closed upon return from the method.

In writeStateHandle(), OutputStream should be closed in finally block.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114580123
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

Ok, then we can also look at this again later. For now it is ok as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5720) Deprecate "Folding" in all of DataStream API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995134#comment-15995134
 ] 

ASF GitHub Bot commented on FLINK-5720:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3816

[FLINK-5720] Deprecate DataStream#fold()

This PR deprecates the various `DataStream#fold()` methods and all related 
internal classes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5720_deprecate_folding

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3816


commit 40fd483992f7820f8e6f7b26cd7e95a6d5f71507
Author: zentol 
Date:   2017-05-03T13:49:03Z

[FLINK-5720] Deprecate DataStream#fold()




> Deprecate "Folding" in all of DataStream API
> 
>
> Key: FLINK-5720
> URL: https://issues.apache.org/jira/browse/FLINK-5720
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Folding is an operation that cannot be done incrementally in a distributed 
> way and that also cannot be done on merging windows. Now that we have 
> {{AggregatingState}} and aggregate operations we should deprecate folding in 
> the APIs and deprecate {{FoldingState}}.
> I suggest to remove folding completely in Flink 2.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3816: [FLINK-5720] Deprecate DataStream#fold()

2017-05-03 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3816

[FLINK-5720] Deprecate DataStream#fold()

This PR deprecates the various `DataStream#fold()` methods and all related 
internal classes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5720_deprecate_folding

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3816


commit 40fd483992f7820f8e6f7b26cd7e95a6d5f71507
Author: zentol 
Date:   2017-05-03T13:49:03Z

[FLINK-5720] Deprecate DataStream#fold()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995105#comment-15995105
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114579898
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard mis

[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114581240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

The idea would be to always check `getResultType()` first and only use the 
type extractor if the method is not implemented. So you would not need to 
enforce a TypeExtractor failure for the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...

2017-05-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3692
  
Alright, this makes sense. The common denominator is that every dns name 
will have an optional task placeholder (`_TASK`) and everything else is static, 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5974) Support Mesos DNS

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995123#comment-15995123
 ] 

ASF GitHub Bot commented on FLINK-5974:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3692
  
Alright, this makes sense. The common denominator is that every dns name 
will have an optional task placeholder (`_TASK`) and everything else is static, 
right?


> Support Mesos DNS
> -
>
> Key: FLINK-5974
> URL: https://issues.apache.org/jira/browse/FLINK-5974
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>
> In certain Mesos/DCOS environments, the slave hostnames aren't resolvable.  
> For this and other reasons, Mesos DNS names would ideally be used for 
> communication within the Flink cluster, not the hostname discovered via 
> `InetAddress.getLocalHost`.
> Some parts of Flink are already configurable in this respect, notably 
> `jobmanager.rpc.address`.  However, the Mesos AppMaster doesn't use that 
> setting for everything (e.g. artifact server), it uses the hostname.
> Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment 
> mode.   To effectively use Mesos DNS, the TM should use 
> `..mesos` as its hostname.   This could be derived 
> from an interpolated configuration string.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5974) Support Mesos DNS

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995099#comment-15995099
 ] 

ASF GitHub Bot commented on FLINK-5974:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3692
  
@tillrohrmann Mesos DNS is not the only DNS solution for Mesos, it is 
merely the DCOS solution.  By using an interpolated string, the name is fully 
configurable.


> Support Mesos DNS
> -
>
> Key: FLINK-5974
> URL: https://issues.apache.org/jira/browse/FLINK-5974
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>
> In certain Mesos/DCOS environments, the slave hostnames aren't resolvable.  
> For this and other reasons, Mesos DNS names would ideally be used for 
> communication within the Flink cluster, not the hostname discovered via 
> `InetAddress.getLocalHost`.
> Some parts of Flink are already configurable in this respect, notably 
> `jobmanager.rpc.address`.  However, the Mesos AppMaster doesn't use that 
> setting for everything (e.g. artifact server), it uses the hostname.
> Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment 
> mode.   To effectively use Mesos DNS, the TM should use 
> `..mesos` as its hostname.   This could be derived 
> from an interpolated configuration string.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995108#comment-15995108
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114580123
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

Ok, then we can also look at this again later. For now it is ok as is.


> Implement incremental checkpointing in RocksDBStateBackend
> --
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995115#comment-15995115
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114581240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

The idea would be to always check `getResultType()` first and only use the 
type extractor if the method is not implemented. So you would not need to 
enforce a TypeExtractor failure for the tests.


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114579898
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc file state.", 
e);
+   }
+
+   if (!registered) {
+   for (String newSstFileName : newSstFileNames) {
+   StreamStateHandle handle = 
sstFiles.get(newSstFileName);
+ 

[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995101#comment-15995101
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114579772
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/DSetUDAGGITCase.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.java.{DataSet => JDataSet, 
ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => 
ScalaExecutionEnv}
+import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, 
WeightedAvgWithMergeAndReset}
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.mockito.Mockito.{mock, when}
+
+import scala.collection.JavaConverters._
+
+/**
+  * We only test some aggregations until better testing of constructed 
DataSet
+  * programs is possible.
+  */
+@RunWith(classOf[Parameterized])
+class DSetUDAGGITCase(configMode: TableConfigMode)
--- End diff --

Sounds good to me. I did not have the UDAGG design across all different 
aggregation tests, as I feel the current agg tests are a little mess up. It 
always takes me a while to find the right test cases among all different test 
files. I put UDAGG test cases into one file which helps me to easily understand 
what kinds of tests have been covered. I think we need to think about how to 
reorganize our agg test structure. Considering the short time to freeze 
feature, let us keep the current structure (I will split the UDAGG into all 
different agg test files) and massage the tests later.


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114579772
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/DSetUDAGGITCase.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.java.{DataSet => JDataSet, 
ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => 
ScalaExecutionEnv}
+import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, 
WeightedAvgWithMergeAndReset}
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.mockito.Mockito.{mock, when}
+
+import scala.collection.JavaConverters._
+
+/**
+  * We only test some aggregations until better testing of constructed 
DataSet
+  * programs is possible.
+  */
+@RunWith(classOf[Parameterized])
+class DSetUDAGGITCase(configMode: TableConfigMode)
--- End diff --

Sounds good to me. I did not have the UDAGG design across all different 
aggregation tests, as I feel the current agg tests are a little mess up. It 
always takes me a while to find the right test cases among all different test 
files. I put UDAGG test cases into one file which helps me to easily understand 
what kinds of tests have been covered. I think we need to think about how to 
reorganize our agg test structure. Considering the short time to freeze 
feature, let us keep the current structure (I will split the UDAGG into all 
different agg test files) and massage the tests later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...

2017-05-03 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/3692
  
@tillrohrmann Mesos DNS is not the only DNS solution for Mesos, it is 
merely the DCOS solution.  By using an interpolated string, the name is fully 
configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6217) ContaineredTaskManagerParameters sets off heap memory size incorrectly

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995087#comment-15995087
 ] 

ASF GitHub Bot commented on FLINK-6217:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3648
  
Thanks for your contribution @haohui and the review @StephanEwen. Changes 
look good to me. Merging this PR.


> ContaineredTaskManagerParameters sets off heap memory size incorrectly
> --
>
> Key: FLINK-6217
> URL: https://issues.apache.org/jira/browse/FLINK-6217
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Thanks [~bill.liu8904] for triaging the issue.
> When {{taskmanager.memory.off-heap}} is disabled, we observed that the total 
> memory that Flink allocates exceed the total memory of the container:
> For a 8G container the JobManager starts the container with the following 
> parameter:
> {noformat}
> $JAVA_HOME/bin/java -Xms6072m -Xmx6072m -XX:MaxDirectMemorySize=6072m ...
> {noformat}
> The total amount of heap memory plus the off-heap memory exceeds the total 
> amount of memory of the container. As a result YARN occasionally kills the 
> container.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3648: [FLINK-6217] ContaineredTaskManagerParameters sets off-he...

2017-05-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3648
  
Thanks for your contribution @haohui and the review @StephanEwen. Changes 
look good to me. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995080#comment-15995080
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114576600
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

Thanks @twalthr .  If I understand you correctly, you suggest to create a 
contract method `getResultType` for UDAGG, such that user can provide the 
result type in case the type extraction fails. Sounds good to me? 
Can you give some examples that when the type extraction will fail (for 
instance a Row type?) and why it may fail, such that I can add some test cases. 
 


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114576600
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,24 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
+* Registered functions can be referenced in Table API and SQL queries.
+*
+* @param name The name under which the function is registered.
+* @param f The AggregateFunction to register.
+* @tparam T The type of the output value.
+* @tparam ACC The type of aggregate accumulator.
+*/
+  def registerFunction[T, ACC](
+  name: String,
+  f: AggregateFunction[T, ACC])
+  : Unit = {
+implicit val typeInfo: TypeInformation[T] = TypeExtractor
--- End diff --

Thanks @twalthr .  If I understand you correctly, you suggest to create a 
contract method `getResultType` for UDAGG, such that user can provide the 
result type in case the type extraction fails. Sounds good to me? 
Can you give some examples that when the type extraction will fail (for 
instance a Row type?) and why it may fail, such that I can add some test cases. 
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5718) Handle JVM Fatal Exceptions in Tasks

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995049#comment-15995049
 ] 

ASF GitHub Bot commented on FLINK-5718:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3811
  
Thanks for your contribution @zimmermatt. Changes look good to me. Merging 
this PR.


> Handle JVM Fatal Exceptions in Tasks
> 
>
> Key: FLINK-5718
> URL: https://issues.apache.org/jira/browse/FLINK-5718
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The TaskManager catches and handles all types of exceptions right now (all 
> {{Throwables}}). The intention behind that is:
>   - Many {{Error}} subclasses are recoverable for the TaskManagers, such as 
> failure to load/link user code
>   - We want to give eager notifications to the JobManager in case something 
> in a task goes wrong.
> However, there are some exceptions which should probably simply terminate the 
> JVM, if caught in the task thread, because they may leave the JVM in a 
> dysfunctional limbo state:
>   - {{OutOfMemoryError}}
>   - {{InternalError}}
>   - {{UnknownError}}
>   - {{ZipError}}
> These are basically the subclasses of {{VirtualMachineError}}, except for 
> {{StackOverflowError}}, which is recoverable and usually recovered already by 
> the time the exception has been thrown and the stack unwound.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3811: [FLINK-5718] [core] TaskManagers exit the JVM on fatal ex...

2017-05-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3811
  
Thanks for your contribution @zimmermatt. Changes look good to me. Merging 
this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995044#comment-15995044
 ] 

ASF GitHub Bot commented on FLINK-5906:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114571910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -327,4 +332,56 @@ object ProjectionTranslator {
   }
   }
 
+  /**
+* Find and replace UDAGG function Call to UDAGGFunctionCall
+*
+* @param fieldthe expression to check
+* @param tableEnv the TableEnvironment
+* @return an expression with correct UDAGGFunctionCall type for UDAGG 
functions
+*/
+  def replaceUDAGGFunctionCall(field: Expression, tableEnv: 
TableEnvironment): Expression = {
--- End diff --

We will not have the chance to execute LogicalNode#resolveExpressions 
before get aggNames, projectFields, etc. I actually tried another alternative 
approach to conduct the replacement in extractAggregationsAndProperties and 
replaceAggregationsAndProperties (we have to check and handle the UDAGG call 
carefully in both functions), it works but I do not like that design. It makes 
the logic of these two methods not completely clean. Also, in over aggregate it 
will not call extractAggregationsAndProperties and 
replaceAggregationsAndProperties. So I decide to implement a separate function 
to handle the UDAGGFunctionCall replacement.


> Add support to register UDAGG in Table and SQL API
> --
>
> Key: FLINK-5906
> URL: https://issues.apache.org/jira/browse/FLINK-5906
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...

2017-05-03 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3809#discussion_r114571910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -327,4 +332,56 @@ object ProjectionTranslator {
   }
   }
 
+  /**
+* Find and replace UDAGG function Call to UDAGGFunctionCall
+*
+* @param fieldthe expression to check
+* @param tableEnv the TableEnvironment
+* @return an expression with correct UDAGGFunctionCall type for UDAGG 
functions
+*/
+  def replaceUDAGGFunctionCall(field: Expression, tableEnv: 
TableEnvironment): Expression = {
--- End diff --

We will not have the chance to execute LogicalNode#resolveExpressions 
before get aggNames, projectFields, etc. I actually tried another alternative 
approach to conduct the replacement in extractAggregationsAndProperties and 
replaceAggregationsAndProperties (we have to check and handle the UDAGG call 
carefully in both functions), it works but I do not like that design. It makes 
the logic of these two methods not completely clean. Also, in over aggregate it 
will not call extractAggregationsAndProperties and 
replaceAggregationsAndProperties. So I decide to implement a separate function 
to handle the UDAGGFunctionCall replacement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3763: [FLINK-6372][scripts] Fix change scala version of ...

2017-05-03 Thread soniclavier
Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/3763


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6372) change-scala-version.sh does not change version for flink-gelly-examples

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995011#comment-15995011
 ] 

ASF GitHub Bot commented on FLINK-6372:
---

Github user soniclavier closed the pull request at:

https://github.com/apache/flink/pull/3763


> change-scala-version.sh does not change version for flink-gelly-examples
> 
>
> Key: FLINK-6372
> URL: https://issues.apache.org/jira/browse/FLINK-6372
> Project: Flink
>  Issue Type: Bug
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> change-scala-version.sh does not change the version for flink-gelly-examples 
> in bin.xml. This is causing build to fail if using scala 2.11, since its 
> looking for flink-gelly-examples_2.10



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995008#comment-15995008
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565991
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

I attempted to put the restore state in the constructor as we discussed. 
But it turns out impossible. 

All state backends should be registered in the task so that the backends 
can be closed when the task is canceled.  If we put the restoring in the 
constructor of the backends, the construction of the backends may be blocked 
(e.g., due to the access to HDFS). Since the construction is not completed yet, 
the backend will not be registered and hence will not be closed.


> Implement incremental checkpointing in RocksDBStateBackend
> --
>
> Key: FLINK-6364
> URL: https://issues.apache.org/jira/browse/FLINK-6364
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565991
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend(
cancelables.registerClosable(keyedStateBackend);
 
// restore if we have some old state
-   if (null != restoreStateHandles && null != 
restoreStateHandles.getManagedKeyedState()) {
-   
keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState());
-   }
+   Collection restoreKeyedStateHandles =
+   restoreStateHandles == null ? null : 
restoreStateHandles.getManagedKeyedState();
+
+   keyedStateBackend.restore(restoreKeyedStateHandles);
--- End diff --

I attempted to put the restore state in the constructor as we discussed. 
But it turns out impossible. 

All state backends should be registered in the task so that the backends 
can be closed when the task is canceled.  If we put the restoring in the 
constructor of the backends, the construction of the backends may be blocked 
(e.g., due to the access to HDFS). Since the construction is not completed yet, 
the backend will not be registered and hence will not be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995007#comment-15995007
 ] 

ASF GitHub Bot commented on FLINK-6364:
---

Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565968
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc f

[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...

2017-05-03 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3801#discussion_r114565968
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The handle to states in incremental snapshots taken by {@link 
RocksDBKeyedStateBackend}
+ */
+public class RocksDBKeyedStateHandle implements KeyedStateHandle, 
CompositeStateHandle {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateHandle.class);
+
+   private static final long serialVersionUID = -8328808513197388231L;
+
+   private final JobID jobId;
+
+   private final String operatorIdentifier;
+
+   private final KeyGroupRange keyGroupRange;
+
+   private final Set newSstFileNames;
+
+   private final Map sstFiles;
+
+   private final Map miscFiles;
+
+   private final StreamStateHandle metaStateHandle;
+
+   private boolean registered;
+
+   RocksDBKeyedStateHandle(
+   JobID jobId,
+   String operatorIdentifier,
+   KeyGroupRange keyGroupRange,
+   Set newSstFileNames,
+   Map sstFiles,
+   Map miscFiles,
+   StreamStateHandle metaStateHandle) {
+
+   this.jobId = jobId;
+   this.operatorIdentifier = operatorIdentifier;
+   this.keyGroupRange = keyGroupRange;
+   this.newSstFileNames = newSstFileNames;
+   this.sstFiles = sstFiles;
+   this.miscFiles = miscFiles;
+   this.metaStateHandle = metaStateHandle;
+   this.registered = false;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   public Map getSstFiles() {
+   return sstFiles;
+   }
+
+   public Map getMiscFiles() {
+   return miscFiles;
+   }
+
+   public StreamStateHandle getMetaStateHandle() {
+   return metaStateHandle;
+   }
+
+   @Override
+   public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+   if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+   return this;
+   } else {
+   return null;
+   }
+   }
+
+   @Override
+   public void discardState() throws Exception {
+
+   try {
+   metaStateHandle.discardState();
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard meta data.", e);
+   }
+
+   try {
+   
StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values());
+   } catch (Exception e) {
+   LOG.warn("Could not properly discard misc file state.", 
e);
+   }
+
+   if (!registered) {
+   for (String newSstFileName : newSstFileNames) {
+   StreamStateHandle handle = 
sstFiles.get(newSstFileName);
+

[jira] [Commented] (FLINK-5974) Support Mesos DNS

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995000#comment-15995000
 ] 

ASF GitHub Bot commented on FLINK-5974:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3692
  
Why do we actually make `TaskManager` hostname configurable via 
`mesos.resourcemanager.tasks.hostname`? Isn't the mesos-dns hostname already 
uniquely defined by the `TaskID` and the framework name contained in 
`FrameworkInfo`? 

If the `FrameworkInfo` does not contain the service name, then we should 
only specify this information. Then we could rename the configuration parameter 
into `mesos.service-name` and concatenate `taskId`, `service-name` and `mesos`, 
where `taskId` is retrieved from the `TaskID` instance. 


> Support Mesos DNS
> -
>
> Key: FLINK-5974
> URL: https://issues.apache.org/jira/browse/FLINK-5974
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>
> In certain Mesos/DCOS environments, the slave hostnames aren't resolvable.  
> For this and other reasons, Mesos DNS names would ideally be used for 
> communication within the Flink cluster, not the hostname discovered via 
> `InetAddress.getLocalHost`.
> Some parts of Flink are already configurable in this respect, notably 
> `jobmanager.rpc.address`.  However, the Mesos AppMaster doesn't use that 
> setting for everything (e.g. artifact server), it uses the hostname.
> Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment 
> mode.   To effectively use Mesos DNS, the TM should use 
> `..mesos` as its hostname.   This could be derived 
> from an interpolated configuration string.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...

2017-05-03 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3692
  
Why do we actually make `TaskManager` hostname configurable via 
`mesos.resourcemanager.tasks.hostname`? Isn't the mesos-dns hostname already 
uniquely defined by the `TaskID` and the framework name contained in 
`FrameworkInfo`? 

If the `FrameworkInfo` does not contain the service name, then we should 
only specify this information. Then we could rename the configuration parameter 
into `mesos.service-name` and concatenate `taskId`, `service-name` and `mesos`, 
where `taskId` is retrieved from the `TaskID` instance. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3802: Add Evenly Graph Generator to Flink Gelly

2017-05-03 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3802
  
What makes this graph 'evenly'? See 
`org.apache.flink.graph.drivers.input.CompleteGraph` for creating an input for 
`Runner` (the default class executed when doing a `/bin/flink run` on the 
flink-gelly-examples jar; see 
[usage](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/gelly/index.html)).
 Also need to update 
[`graph_generators.md`](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/gelly/graph_generators.html).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6438) Expand docs home page a little

2017-05-03 Thread David Anderson (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson reassigned FLINK-6438:
-

Assignee: David Anderson

> Expand docs home page a little
> --
>
> Key: FLINK-6438
> URL: https://issues.apache.org/jira/browse/FLINK-6438
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
>
> The idea is to improve the documentation home page by adding a few links to 
> valuable items that are too easily overlooked.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6438) Expand docs home page a little

2017-05-03 Thread David Anderson (JIRA)
David Anderson created FLINK-6438:
-

 Summary: Expand docs home page a little
 Key: FLINK-6438
 URL: https://issues.apache.org/jira/browse/FLINK-6438
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: David Anderson
Priority: Minor


The idea is to improve the documentation home page by adding a few links to 
valuable items that are too easily overlooked.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994987#comment-15994987
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3778
  
@zentol Thanks for reviewing! 😃 

I merged this also on master, so that we have backwards compatibility tests 
from 1.2 to the current master, which was the reason for this whole exercise.


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0, 1.2.2
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3778: [FLINK-5969] Add savepoint backwards compatibility tests ...

2017-05-03 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3778
  
@zentol Thanks for reviewing! 😃 

I merged this also on master, so that we have backwards compatibility tests 
from 1.2 to the current master, which was the reason for this whole exercise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-05-03 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5969.
---
   Resolution: Fixed
Fix Version/s: 1.2.2

Implemented on release-1.2 in:
 - 852a710b4d91bdd319238c87d227c51a904070a7
 - 53432e0690510aa5838165d4958777d872668be4
 - c9ba4f0313346d96db6258b533041da180f5471e
 - c89d4b43213d7d3e2ad184787061fe3178be5691
 - c43fc2a14fa9ece5f3a80c8f25e6d99a2450f671
 - 62601b4978582f05e5fadec83ce18eeec5842686
 - 611434c6fa8e53cac25dd93f568d2670ec4ead72
 - 52fb578953fb571f5787c8f69fe2ac4525488a3d
 - a3ccffcb0a1b3e936fff31a61dddf1cada8ab99b
 - f68f6542bb2ba0171883d27c9ac75c74301ddf88

Implemented on master in:
 - 821ec80d72308746aed307498be157b58b7b65e9
 - fb7793f033cfa0d6d77ef25a6c518a5a203ebb82
 - 2c6377f257cafcbaea3a5bf33dd27852ed05afec
 - 84eea72295eda5e7289deb5221c7b990b7b65883
 - e40f2e184c6e57d4346312f969a64727389e92fa
 - 0ecb5d0050b84ba48105836288d43ce4c4749459
 - 44e472f6cffd75213c3b7373bd622b734f89f0b0
 - 2779197f237446e3bff4e9e15f90c24d721c8ab4
 - 9ed98f2e5a32fb14de03b9a8ea1dd45851cc3a7e
 - 1882c90505b0d25775b969cc025c8a3087b82f37
 - 6f8b3c6158a87c14f2fdb3446092df367131e194
 - cced07a28622016ca1ee2d5b316423701c9a986c

> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0, 1.2.2
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994974#comment-15994974
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3778
  
Manually merged


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> ---
>
> Key: FLINK-5969
> URL: https://issues.apache.org/jira/browse/FLINK-5969
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   4   >