[jira] [Commented] (FLINK-6367) support custom header settings of allow origin
[ https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996257#comment-15996257 ] ASF GitHub Bot commented on FLINK-6367: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3769 Patience please... > support custom header settings of allow origin > -- > > Key: FLINK-6367 > URL: https://issues.apache.org/jira/browse/FLINK-6367 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > `jobmanager.web.access-control-allow-origin`: Enable custom access control > parameter for allow origin header, default is `*`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3769 Patience please... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6439) Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()
[ https://issues.apache.org/jira/browse/FLINK-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996245#comment-15996245 ] ASF GitHub Bot commented on FLINK-6439: --- GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3819 [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil …tUtil Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6439 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3819.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3819 commit de723c7eb9ad9b8c8c27d6b884d47faa7d324dfd Author: mengji.fy Date: 2017-05-04T05:37:01Z [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil > Unclosed InputStream in OperatorSnapshotUtil#readStateHandle() > -- > > Key: FLINK-6439 > URL: https://issues.apache.org/jira/browse/FLINK-6439 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Fang Yong >Priority: Minor > > {code} > FileInputStream in = new FileInputStream(path); > DataInputStream dis = new DataInputStream(in); > {code} > None of the in / dis is closed upon return from the method. > In writeStateHandle(), OutputStream should be closed in finally block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3819: [FLINK-6439] Fix close OutputStream && InputStream...
GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3819 [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil â¦tUtil Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6439 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3819.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3819 commit de723c7eb9ad9b8c8c27d6b884d47faa7d324dfd Author: mengji.fy Date: 2017-05-04T05:37:01Z [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996233#comment-15996233 ] ASF GitHub Bot commented on FLINK-6364: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3801 @StefanRRichter Thanks a lot for your review. I have updated the pull request as suggested. The following changes are made 1. Remove the checkpoint type for incremental checkpoints. Now the support for incremental checkpointing becomes a configurable feature in `RocksDBKeyedStateBackend`, just like asynchronous checkpointing in `HeapKeyedStateBackend`. Incremental checkpointing will be performed if the feature is enabled and the checkpoint to perform is not a savepoint. 2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` and do some refactoring. 3. Allow `KeyedStateHandle` to register shared states. 4. Maintain the information of last completed checkpoint with the notification of `AbstractStreamOperator`. 5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources in both full and incremental checkpointing. 6. Parameterize `PartitionedStateCheckpointingITCase` to test the snapshotting and restoring with different backend settings. It's appreciated if you can take a look at these changes. Any comment is welcome. > Implement incremental checkpointing in RocksDBStateBackend > -- > > Key: FLINK-6364 > URL: https://issues.apache.org/jira/browse/FLINK-6364 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > {{RocksDBStateBackend}} is well suited for incremental checkpointing because > RocksDB is base on LSM trees, which record updates in new sst files and all > sst files are immutable. By only materializing those new sst files, we can > significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3801: [FLINK-6364] [checkpoints] Implement incremental checkpoi...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3801 @StefanRRichter Thanks a lot for your review. I have updated the pull request as suggested. The following changes are made 1. Remove the checkpoint type for incremental checkpoints. Now the support for incremental checkpointing becomes a configurable feature in `RocksDBKeyedStateBackend`, just like asynchronous checkpointing in `HeapKeyedStateBackend`. Incremental checkpointing will be performed if the feature is enabled and the checkpoint to perform is not a savepoint. 2. Rename `RocksDBKeyedStateHandle` to `RocksDBIncrementalKeyedStateHandle` and do some refactoring. 3. Allow `KeyedStateHandle` to register shared states. 4. Maintain the information of last completed checkpoint with the notification of `AbstractStreamOperator`. 5. Parameterize `RocksDBStateBackendTest` to test the cleanup of resources in both full and incremental checkpointing. 6. Parameterize `PartitionedStateCheckpointingITCase` to test the snapshotting and restoring with different backend settings. It's appreciated if you can take a look at these changes. Any comment is welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996218#comment-15996218 ] ASF GitHub Bot commented on FLINK-6364: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703946 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = ss
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703946 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + private List> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map miscFiles = restoreStateHandle.getM
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996212#comment-15996212 ] ASF GitHub Bot commented on FLINK-6364: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703775 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = +
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114703775 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map baseSstFiles; + + private List> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + private FSDataInputStream inputStream = null; + private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + // new sst files since the last completed checkpoint + private Set newSstFileNames = new HashSet<>(); + + // handles to the sst files in the current snapshot + private Map sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + return outputStream.closeAndGetHandle(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + inputStream = null; + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + outputStream = null; + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serializ
[jira] [Assigned] (FLINK-6439) Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()
[ https://issues.apache.org/jira/browse/FLINK-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-6439: Assignee: Fang Yong > Unclosed InputStream in OperatorSnapshotUtil#readStateHandle() > -- > > Key: FLINK-6439 > URL: https://issues.apache.org/jira/browse/FLINK-6439 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Fang Yong >Priority: Minor > > {code} > FileInputStream in = new FileInputStream(path); > DataInputStream dis = new DataInputStream(in); > {code} > None of the in / dis is closed upon return from the method. > In writeStateHandle(), OutputStream should be closed in finally block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3569: [flink-6036]Let catalog support partition
Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3569#discussion_r114697342 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -44,6 +43,8 @@ case class ExternalCatalogTable( properties: JMap[String, String] = new JHashMap(), stats: TableStats = null, comment: String = null, +partitionColumnNames: JLinkedHashSet[String] = new JLinkedHashSet(), +isPartitioned: Boolean = false, --- End diff -- add comments for ``partitionColumnNames`` and ``isPartitioned`` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6381) Unnecessary synchronizing object in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6381: Description: It seems that currently there are two places should not employ the {{synchronized}} to describe {{pendingFilesPerCheckpoint}}, as it is only restored state object for checkpoint and no sharing of the data-structure between different threads. Codes in {{BucketingSink}} are as follows. {code} private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) { ... synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } ... }{code} and {code}private void handleRestoredBucketState(State restoredState) { ... synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.clear(); } } {code} Hi, [~kkl0u]. Is there any other stuff shoud add here ? Would you mind have a more thorough look in this class ? Thanks go out to you. I am very appreciate it. was: It seems that currently there are two places should not employ the {{synchronized}} to describe {{pendingFilesPerCheckpoint}}, as it is only restored state object for checkpoint and no sharing of the data-structure between different threads. Codes in {{BucketingSink}} are as follow. {code} private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) { ... synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); } ... }{code} and {code}private void handleRestoredBucketState(State restoredState) { ... synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.clear(); } } {code} Hi, [~kkl0u]. Is there any other stuff shoud add here ? Would you mind have a more thorough look in this class ? Thanks go out to you. I am very appreciate it. > Unnecessary synchronizing object in BucketingSink > - > > Key: FLINK-6381 > URL: https://issues.apache.org/jira/browse/FLINK-6381 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: mingleizhang >Assignee: mingleizhang > > It seems that currently there are two places should not employ the > {{synchronized}} to describe {{pendingFilesPerCheckpoint}}, as it is only > restored state object for checkpoint and no sharing of the data-structure > between different threads. Codes in {{BucketingSink}} are as follows. {code} > private void handleRestoredRollingSinkState(RollingSink.BucketState > restoredState) { > ... > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > } > ... > }{code} and {code}private void handleRestoredBucketState(State > restoredState) { > ... > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > } {code} > Hi, [~kkl0u]. Is there any other stuff shoud add here ? Would you mind have a > more thorough look in this class ? Thanks go out to you. I am very appreciate > it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6346) Migrate from Java serialization for GenericWriteAheadSink's state
[ https://issues.apache.org/jira/browse/FLINK-6346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996133#comment-15996133 ] ASF GitHub Bot commented on FLINK-6346: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3812 @zhangminglei @zentol thanks for your suggestions, and I have updated the code. As discussed in [https://github.com/apache/flink/pull/3750#issuecomment-298869900](url), it may not need to register two state like the current way, and older version of the data could be converted in some way, such as upgrade tools. I look forward to hearing your thoughts after you think about it later. @tzulitai > Migrate from Java serialization for GenericWriteAheadSink's state > - > > Key: FLINK-6346 > URL: https://issues.apache.org/jira/browse/FLINK-6346 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration > for {{GenericWriteAheadSink}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3812: [FLINK-6346] Migrate from Java serialization for GenericW...
Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3812 @zhangminglei @zentol thanks for your suggestions, and I have updated the code. As discussed in [https://github.com/apache/flink/pull/3750#issuecomment-298869900](url), it may not need to register two state like the current way, and older version of the data could be converted in some way, such as upgrade tools. I look forward to hearing your thoughts after you think about it later. @tzulitai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3569: [flink-6036]Let catalog support partition
Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3569#discussion_r114695239 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util.{LinkedHashMap => JLinkedHashMap, Map => JMap, HashMap => JHashMap} + +import org.apache.flink.table.plan.stats.TablePartitionStats + +object ExternalCatalogTypes { + + /** +* external table partition specification. +* Key is partition column name, value is partition column value. +*/ + type PartitionSpec = JLinkedHashMap[String, String] +} + +/** + * Partition definition of an external Catalog table + * + * @param partitionSpec partition specification + * @param propertiespartition properties + * @param stats partition statistics + */ +case class ExternalCatalogTablePartition( --- End diff -- this file is same as flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala, remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3810: [Flink-6397] MultipleProgramsTestBase does not res...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/3810#discussion_r114693043 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java --- @@ -80,29 +80,36 @@ protected final TestExecutionMode mode; - + private TestEnvironment testEnvironment; + + private CollectionTestEnvironment collectionTestEnvironment; + public MultipleProgramsTestBase(TestExecutionMode mode) { this.mode = mode; - + switch(mode){ case CLUSTER: - new TestEnvironment(cluster, 4).setAsContext(); + testEnvironment = new TestEnvironment(cluster, 4); --- End diff -- Sounds reasonable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3810: [Flink-6397] MultipleProgramsTestBase does not reset Cont...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/3810 @StephanEwen I agree that a static unset method would be a much easier implementation. Do you think it's acceptable that the unset method is static but set method is not static in TestEnvironment? Or we can implement the set method as static too, but that will make more changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996078#comment-15996078 ] ASF GitHub Bot commented on FLINK-5906: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114692346 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.utils + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction +import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ + +/** + * Calcite wrapper for user-defined aggregate functions. + * + * @param name function name (used by SQL parser) + * @param aggregateFunction aggregate function to be called + * @param returnType the type information of returned value + * @param typeFactory type factory for converting Flink's between Calcite's types + */ +class AggSqlFunction( +name: String, +aggregateFunction: AggregateFunction[_, _], +returnType: TypeInformation[_], +typeFactory: FlinkTypeFactory) + extends SqlUserDefinedAggFunction( +new SqlIdentifier(name, SqlParserPos.ZERO), +createReturnTypeInference(returnType, typeFactory), +createOperandTypeInference(aggregateFunction, typeFactory), +createOperandTypeChecker(aggregateFunction), +// Do not need to provide a calcite aggregateFunction here. Flink aggregateion function +// will be generated when translating the calcite relnode to flink runtime execution plan +null + ) { + + def getFunction: AggregateFunction[_, _] = aggregateFunction +} + +object AggSqlFunction { + + def apply( + name: String, + aggregateFunction: AggregateFunction[_, _], + returnType: TypeInformation[_], + typeFactory: FlinkTypeFactory): AggSqlFunction = { + +new AggSqlFunction(name, aggregateFunction, returnType, typeFactory) + } + + private[flink] def createOperandTypeInference( + aggregateFunction: AggregateFunction[_, _], + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { +/** + * Operand type inference based on [[AggregateFunction]] given information. + */ +new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + +val operandTypeInfo = getOperandTypeInfo(callBinding) + +val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) + .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + +val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) + .map(typeFactory.createTypeFromTypeInfo) + +for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { +operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { --- End diff --
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996076#comment-15996076 ] ASF GitHub Bot commented on FLINK-5906: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114692276 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.utils + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction +import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ + +/** + * Calcite wrapper for user-defined aggregate functions. + * + * @param name function name (used by SQL parser) + * @param aggregateFunction aggregate function to be called + * @param returnType the type information of returned value + * @param typeFactory type factory for converting Flink's between Calcite's types + */ +class AggSqlFunction( +name: String, +aggregateFunction: AggregateFunction[_, _], +returnType: TypeInformation[_], +typeFactory: FlinkTypeFactory) + extends SqlUserDefinedAggFunction( +new SqlIdentifier(name, SqlParserPos.ZERO), +createReturnTypeInference(returnType, typeFactory), +createOperandTypeInference(aggregateFunction, typeFactory), +createOperandTypeChecker(aggregateFunction), +// Do not need to provide a calcite aggregateFunction here. Flink aggregateion function +// will be generated when translating the calcite relnode to flink runtime execution plan +null + ) { + + def getFunction: AggregateFunction[_, _] = aggregateFunction +} + +object AggSqlFunction { + + def apply( + name: String, + aggregateFunction: AggregateFunction[_, _], + returnType: TypeInformation[_], + typeFactory: FlinkTypeFactory): AggSqlFunction = { + +new AggSqlFunction(name, aggregateFunction, returnType, typeFactory) + } + + private[flink] def createOperandTypeInference( + aggregateFunction: AggregateFunction[_, _], + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { +/** + * Operand type inference based on [[AggregateFunction]] given information. + */ +new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + +val operandTypeInfo = getOperandTypeInfo(callBinding) + +val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) + .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + +val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) + .map(typeFactory.createTypeFromTypeInfo) + +for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { +operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { +// last argu
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114692346 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.utils + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction +import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ + +/** + * Calcite wrapper for user-defined aggregate functions. + * + * @param name function name (used by SQL parser) + * @param aggregateFunction aggregate function to be called + * @param returnType the type information of returned value + * @param typeFactory type factory for converting Flink's between Calcite's types + */ +class AggSqlFunction( +name: String, +aggregateFunction: AggregateFunction[_, _], +returnType: TypeInformation[_], +typeFactory: FlinkTypeFactory) + extends SqlUserDefinedAggFunction( +new SqlIdentifier(name, SqlParserPos.ZERO), +createReturnTypeInference(returnType, typeFactory), +createOperandTypeInference(aggregateFunction, typeFactory), +createOperandTypeChecker(aggregateFunction), +// Do not need to provide a calcite aggregateFunction here. Flink aggregateion function +// will be generated when translating the calcite relnode to flink runtime execution plan +null + ) { + + def getFunction: AggregateFunction[_, _] = aggregateFunction +} + +object AggSqlFunction { + + def apply( + name: String, + aggregateFunction: AggregateFunction[_, _], + returnType: TypeInformation[_], + typeFactory: FlinkTypeFactory): AggSqlFunction = { + +new AggSqlFunction(name, aggregateFunction, returnType, typeFactory) + } + + private[flink] def createOperandTypeInference( + aggregateFunction: AggregateFunction[_, _], + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { +/** + * Operand type inference based on [[AggregateFunction]] given information. + */ +new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + +val operandTypeInfo = getOperandTypeInfo(callBinding) + +val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) + .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + +val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) + .map(typeFactory.createTypeFromTypeInfo) + +for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { +operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { --- End diff -- Yes, if the last parameter is a component, say Array[Int]. We want to get the type of Int, not the type of Array. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not ha
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114692276 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.utils + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction +import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ + +/** + * Calcite wrapper for user-defined aggregate functions. + * + * @param name function name (used by SQL parser) + * @param aggregateFunction aggregate function to be called + * @param returnType the type information of returned value + * @param typeFactory type factory for converting Flink's between Calcite's types + */ +class AggSqlFunction( +name: String, +aggregateFunction: AggregateFunction[_, _], +returnType: TypeInformation[_], +typeFactory: FlinkTypeFactory) + extends SqlUserDefinedAggFunction( +new SqlIdentifier(name, SqlParserPos.ZERO), +createReturnTypeInference(returnType, typeFactory), +createOperandTypeInference(aggregateFunction, typeFactory), +createOperandTypeChecker(aggregateFunction), +// Do not need to provide a calcite aggregateFunction here. Flink aggregateion function +// will be generated when translating the calcite relnode to flink runtime execution plan +null + ) { + + def getFunction: AggregateFunction[_, _] = aggregateFunction +} + +object AggSqlFunction { + + def apply( + name: String, + aggregateFunction: AggregateFunction[_, _], + returnType: TypeInformation[_], + typeFactory: FlinkTypeFactory): AggSqlFunction = { + +new AggSqlFunction(name, aggregateFunction, returnType, typeFactory) + } + + private[flink] def createOperandTypeInference( + aggregateFunction: AggregateFunction[_, _], + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { +/** + * Operand type inference based on [[AggregateFunction]] given information. + */ +new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + +val operandTypeInfo = getOperandTypeInfo(callBinding) + +val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) + .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + +val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) + .map(typeFactory.createTypeFromTypeInfo) + +for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { +operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { +// last argument is a collection, the array type +operandTypes(i) = inferredTypes.last.getComponentType + } else { +operandTypes(i) = inferredTypes.last + } +} + } +} + } +
[jira] [Updated] (FLINK-6441) Improve the UDTF
[ https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-6441: -- Description: According to [FLINK-6334], UDTF's apply method return a unbounded Table which consists of a LogicalTableFunctionCall, and only supported Alias transformation, this issue is focus on adding evaluating in Select, e.g table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b)) (was: According to [FLINK-6334] https://issues.apache.org/jira/browse/FLINK-6334, UDTF's apply method return a unbounded Table which consists of a LogicalTableFunctionCall, and only supported Alias transformation, this issue is focus on adding evaluating in Select, e.g table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b))) > Improve the UDTF > > > Key: FLINK-6441 > URL: https://issues.apache.org/jira/browse/FLINK-6441 > Project: Flink > Issue Type: Improvement >Reporter: Ruidong Li >Assignee: Ruidong Li > > According to [FLINK-6334], UDTF's apply method return a unbounded Table which > consists of a LogicalTableFunctionCall, and only supported Alias > transformation, this issue is focus on adding evaluating in Select, e.g > table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b)) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6441) Improve the UDTF
[ https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li updated FLINK-6441: -- Description: According to [FLINK-6334] https://issues.apache.org/jira/browse/FLINK-6334, UDTF's apply method return a unbounded Table which consists of a LogicalTableFunctionCall, and only supported Alias transformation, this issue is focus on adding evaluating in Select, e.g table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b)) > Improve the UDTF > > > Key: FLINK-6441 > URL: https://issues.apache.org/jira/browse/FLINK-6441 > Project: Flink > Issue Type: Improvement >Reporter: Ruidong Li >Assignee: Ruidong Li > > According to [FLINK-6334] https://issues.apache.org/jira/browse/FLINK-6334, > UDTF's apply method return a unbounded Table which consists of a > LogicalTableFunctionCall, and only supported Alias transformation, this issue > is focus on adding evaluating in Select, e.g table.join(split('c) as ('a, b) > select ('a * 2 as 'a, 'b + 1 as 'b)) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6441) Improve the UDTF
Ruidong Li created FLINK-6441: - Summary: Improve the UDTF Key: FLINK-6441 URL: https://issues.apache.org/jira/browse/FLINK-6441 Project: Flink Issue Type: Improvement Reporter: Ruidong Li Assignee: Ruidong Li -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996055#comment-15996055 ] ASF GitHub Bot commented on FLINK-5906: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114690845 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation { new SqlAvgAggFunction(AVG) } } + +case class UDAGGFunctionCall( +aggregateFunction: AggregateFunction[_, _], +args: Seq[Expression]) + extends Aggregation { + + override private[flink] def children: Seq[Expression] = args + + // Override makeCopy method in TreeNode, to produce vargars properly + override def makeCopy(args: Array[AnyRef]): this.type = { +if (args.length < 1) { + throw new TableException("Invalid constructor params") +} +val agg = args.head.asInstanceOf[AggregateFunction[_, _]] +val arg = args.last.asInstanceOf[Seq[Expression]] +new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type] + } + + override def resultType: TypeInformation[_] = TypeExtractor.createTypeInfo( +aggregateFunction, classOf[AggregateFunction[_, _]], aggregateFunction.getClass, 0) + + override def validateInput(): ValidationResult = { +val signature = children.map(_.resultType) +// look for a signature that matches the input types +val foundSignature = getAccumulateMethodSignature(aggregateFunction, signature) +if (foundSignature.isEmpty) { + ValidationFailure(s"Given parameters do not match any signature. \n" + + s"Actual: ${signatureToString(signature)} \n" + + s"Expected: ${signaturesToString(aggregateFunction, "accumulate")}") +} else { + ValidationSuccess +} + } + + override def toString(): String = s"${aggregateFunction.getClass.getSimpleName}($args)" + + override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { +val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] +val sqlFunction = AggSqlFunction(name, aggregateFunction, resultType, typeFactory) --- End diff -- I was trying to keep this name consistent with `ScalaSqlFunction` and `TableSqlFunction` > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114690845 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation { new SqlAvgAggFunction(AVG) } } + +case class UDAGGFunctionCall( +aggregateFunction: AggregateFunction[_, _], +args: Seq[Expression]) + extends Aggregation { + + override private[flink] def children: Seq[Expression] = args + + // Override makeCopy method in TreeNode, to produce vargars properly + override def makeCopy(args: Array[AnyRef]): this.type = { +if (args.length < 1) { + throw new TableException("Invalid constructor params") +} +val agg = args.head.asInstanceOf[AggregateFunction[_, _]] +val arg = args.last.asInstanceOf[Seq[Expression]] +new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type] + } + + override def resultType: TypeInformation[_] = TypeExtractor.createTypeInfo( +aggregateFunction, classOf[AggregateFunction[_, _]], aggregateFunction.getClass, 0) + + override def validateInput(): ValidationResult = { +val signature = children.map(_.resultType) +// look for a signature that matches the input types +val foundSignature = getAccumulateMethodSignature(aggregateFunction, signature) +if (foundSignature.isEmpty) { + ValidationFailure(s"Given parameters do not match any signature. \n" + + s"Actual: ${signatureToString(signature)} \n" + + s"Expected: ${signaturesToString(aggregateFunction, "accumulate")}") +} else { + ValidationSuccess +} + } + + override def toString(): String = s"${aggregateFunction.getClass.getSimpleName}($args)" + + override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { +val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] +val sqlFunction = AggSqlFunction(name, aggregateFunction, resultType, typeFactory) --- End diff -- I was trying to keep this name consistent with `ScalaSqlFunction` and `TableSqlFunction` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3818: [FLINK-5514] [table] Implement an efficient physic...
GitHub user godfreyhe opened a pull request: https://github.com/apache/flink/pull/3818 [FLINK-5514] [table] Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS You can merge this pull request into a Git repository by running: $ git pull https://github.com/godfreyhe/flink FLINK-5514 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3818.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3818 commit fa876e41845b4d03bd896349e7a3539d278ccfdc Author: godfreyhe Date: 2017-05-04T02:08:04Z [FLINK-5514] [table] Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5514) Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS
[ https://issues.apache.org/jira/browse/FLINK-5514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996041#comment-15996041 ] ASF GitHub Bot commented on FLINK-5514: --- GitHub user godfreyhe opened a pull request: https://github.com/apache/flink/pull/3818 [FLINK-5514] [table] Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS You can merge this pull request into a Git repository by running: $ git pull https://github.com/godfreyhe/flink FLINK-5514 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3818.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3818 commit fa876e41845b4d03bd896349e7a3539d278ccfdc Author: godfreyhe Date: 2017-05-04T02:08:04Z [FLINK-5514] [table] Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS > Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS > --- > > Key: FLINK-5514 > URL: https://issues.apache.org/jira/browse/FLINK-5514 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: godfrey he > > A first support for GROUPING SETS has been added in FLINK-5303. However, the > current runtime implementation is not very efficient as it basically only > translates logical operators to physical operators i.e. grouping sets are > currently only translated into multiple groupings that are unioned together. > A rough design document for this has been created in FLINK-2980. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6367) support custom header settings of allow origin
[ https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996039#comment-15996039 ] ASF GitHub Bot commented on FLINK-6367: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3769 hi, @zentol @StephanEwen It didn't merge in. @zentol Please check again. > support custom header settings of allow origin > -- > > Key: FLINK-6367 > URL: https://issues.apache.org/jira/browse/FLINK-6367 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > `jobmanager.web.access-control-allow-origin`: Enable custom access control > parameter for allow origin header, default is `*`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3769 hi, @zentol @StephanEwen It didn't merge in. @zentol Please check again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3759: [FLINK-6366] close KafkaConsumer in FlinkKafkaCons...
Github user fanyon closed the pull request at: https://github.com/apache/flink/pull/3759 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6366) KafkaConsumer is not closed in FlinkKafkaConsumer09
[ https://issues.apache.org/jira/browse/FLINK-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996036#comment-15996036 ] ASF GitHub Bot commented on FLINK-6366: --- Github user fanyon closed the pull request at: https://github.com/apache/flink/pull/3759 > KafkaConsumer is not closed in FlinkKafkaConsumer09 > --- > > Key: FLINK-6366 > URL: https://issues.apache.org/jira/browse/FLINK-6366 > Project: Flink > Issue Type: Bug >Reporter: Fang Yong >Assignee: Fang Yong > > In getKafkaPartitions of FlinkKafkaConsumer09, the KafkaConsumer is created > as flowers and will not be closed. > {code:title=FlinkKafkaConsumer09.java|borderStyle=solid} > protected List getKafkaPartitions(List topics) { > // read the partitions that belong to the listed topics > final List partitions = new ArrayList<>(); > try (KafkaConsumer consumer = new > KafkaConsumer<>(this.properties)) { > for (final String topic: topics) { > // get partitions for each topic > List partitionsForTopic = > consumer.partitionsFor(topic); > ... > } > } > ... > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5514) Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS
[ https://issues.apache.org/jira/browse/FLINK-5514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-5514: - Assignee: godfrey he > Implement an efficient physical execution for CUBE/ROLLUP/GROUPING SETS > --- > > Key: FLINK-5514 > URL: https://issues.apache.org/jira/browse/FLINK-5514 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: godfrey he > > A first support for GROUPING SETS has been added in FLINK-5303. However, the > current runtime implementation is not very efficient as it basically only > translates logical operators to physical operators i.e. grouping sets are > currently only translated into multiple groupings that are unioned together. > A rough design document for this has been created in FLINK-2980. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6275) Unprotected access to resourceManager in YarnFlinkApplicationMasterRunner#runApplicationMaster
[ https://issues.apache.org/jira/browse/FLINK-6275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996018#comment-15996018 ] ASF GitHub Bot commented on FLINK-6275: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3724 @StephanEwen Thanks. I might close this PR as there is a refactor to ```YarnFlinkApplicationMasterRunner ``` by combining with ```AbstractYarnFlinkApplicationMasterRunner``` in one class. See : https://issues.apache.org/jira/browse/FLINK-6351 > Unprotected access to resourceManager in > YarnFlinkApplicationMasterRunner#runApplicationMaster > -- > > Key: FLINK-6275 > URL: https://issues.apache.org/jira/browse/FLINK-6275 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > > {code} > // wait for resource manager to finish > resourceManager.getTerminationFuture().get(); > {code} > The above is outside synchronized block. > @GuardedBy indicates that access should be protected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3724: [FLINK-6275] [yarn] Fix unprotected access to resourceMan...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3724 @StephanEwen Thanks. I might close this PR as there is a refactor to ```YarnFlinkApplicationMasterRunner ``` by combining with ```AbstractYarnFlinkApplicationMasterRunner``` in one class. See : https://issues.apache.org/jira/browse/FLINK-6351 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6428) Add support DISTINCT in dataStream SQL
[ https://issues.apache.org/jira/browse/FLINK-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995971#comment-15995971 ] ASF GitHub Bot commented on FLINK-6428: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3817 [FLINK-6428][table] Add support DISTINCT in dataStream SQL With the changes of this PR. `DISTINCT` keyword can be supported `SELECT Clause` of dataStream SQL. In standard database there are two situations can using `DISTINCT` keyword. * in `SELECT Clause`, e.g.: `SELECT DISTINCT name FROM table` * in `AGG Clause`, e.g.: `COUNT([ALL|DISTINCT] expression)`,`SUM([ALL|DISTINCT] expression)`, etc. In this JIRA. we talk about `SELECT Clause`. With the growing elements, the limitations tend to be back-end storage(flink state). In theory, external storage is infinitely large (user can control and expect), this point of view, the infinite STREAM of the DISTINCT can be supported.In addition, external storage, for example: RocksDB, the user can set the TTL according to the actual amount of business data to ensure that external storage is working properly. - [x] General - The pull request references the related JIRA issue ("[FLINK-6428][table] Add support DISTINCT in dataStream SQL") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6428-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3817.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3817 commit f1aaffd765f05f8b51801ae1bb66426d99480970 Author: sunjincheng121 Date: 2017-05-03T03:47:09Z [FLINK-6428][table] Add support DISTINCT in dataStream SQL > Add support DISTINCT in dataStream SQL > -- > > Key: FLINK-6428 > URL: https://issues.apache.org/jira/browse/FLINK-6428 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Add support DISTINCT in dataStream SQL as follow: > DATA: > {code} > (name, age) > (kevin, 28), > (sunny, 6), > (jack, 6) > {code} > SQL: > {code} > SELECT DISTINCT age FROM MyTable" > {code} > RESULTS: > {code} > 28, 6 > {code} > To DataStream: > {code} > inputDS > .keyBy() // KeyBy on all fields > .flatMap() // Eliminate duplicate data > {code} > [~fhueske] do we need this feature? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3817: [FLINK-6428][table] Add support DISTINCT in dataSt...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3817 [FLINK-6428][table] Add support DISTINCT in dataStream SQL With the changes of this PR. `DISTINCT` keyword can be supported `SELECT Clause` of dataStream SQL. In standard database there are two situations can using `DISTINCT` keyword. * in `SELECT Clause`, e.g.: `SELECT DISTINCT name FROM table` * in `AGG Clause`, e.g.: `COUNT([ALL|DISTINCT] expression)`,`SUM([ALL|DISTINCT] expression)`, etc. In this JIRA. we talk about `SELECT Clause`. With the growing elements, the limitations tend to be back-end storage(flink state). In theory, external storage is infinitely large (user can control and expect), this point of view, the infinite STREAM of the DISTINCT can be supported.In addition, external storage, for example: RocksDB, the user can set the TTL according to the actual amount of business data to ensure that external storage is working properly. - [x] General - The pull request references the related JIRA issue ("[FLINK-6428][table] Add support DISTINCT in dataStream SQL") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6428-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3817.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3817 commit f1aaffd765f05f8b51801ae1bb66426d99480970 Author: sunjincheng121 Date: 2017-05-03T03:47:09Z [FLINK-6428][table] Add support DISTINCT in dataStream SQL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995858#comment-15995858 ] Bill Liu commented on FLINK-4022: - any update on this feature? will it be released in 1.3 ? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995668#comment-15995668 ] ASF GitHub Bot commented on FLINK-5906: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114651484 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation { new SqlAvgAggFunction(AVG) } } + +case class UDAGGFunctionCall( +aggregateFunction: AggregateFunction[_, _], +args: Seq[Expression]) + extends Aggregation { + + override private[flink] def children: Seq[Expression] = args + + // Override makeCopy method in TreeNode, to produce vargars properly + override def makeCopy(args: Array[AnyRef]): this.type = { +if (args.length < 1) { + throw new TableException("Invalid constructor params") +} +val agg = args.head.asInstanceOf[AggregateFunction[_, _]] +val arg = args.last.asInstanceOf[Seq[Expression]] +new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type] + } + + override def resultType: TypeInformation[_] = TypeExtractor.createTypeInfo( +aggregateFunction, classOf[AggregateFunction[_, _]], aggregateFunction.getClass, 0) + + override def validateInput(): ValidationResult = { +val signature = children.map(_.resultType) +// look for a signature that matches the input types +val foundSignature = getAccumulateMethodSignature(aggregateFunction, signature) +if (foundSignature.isEmpty) { + ValidationFailure(s"Given parameters do not match any signature. \n" + + s"Actual: ${signatureToString(signature)} \n" + + s"Expected: ${signaturesToString(aggregateFunction, "accumulate")}") --- End diff -- The signature string includes the Accumulator, which should be removed. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114651484 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation { new SqlAvgAggFunction(AVG) } } + +case class UDAGGFunctionCall( +aggregateFunction: AggregateFunction[_, _], +args: Seq[Expression]) + extends Aggregation { + + override private[flink] def children: Seq[Expression] = args + + // Override makeCopy method in TreeNode, to produce vargars properly + override def makeCopy(args: Array[AnyRef]): this.type = { +if (args.length < 1) { + throw new TableException("Invalid constructor params") +} +val agg = args.head.asInstanceOf[AggregateFunction[_, _]] +val arg = args.last.asInstanceOf[Seq[Expression]] +new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type] + } + + override def resultType: TypeInformation[_] = TypeExtractor.createTypeInfo( +aggregateFunction, classOf[AggregateFunction[_, _]], aggregateFunction.getClass, 0) + + override def validateInput(): ValidationResult = { +val signature = children.map(_.resultType) +// look for a signature that matches the input types +val foundSignature = getAccumulateMethodSignature(aggregateFunction, signature) +if (foundSignature.isEmpty) { + ValidationFailure(s"Given parameters do not match any signature. \n" + + s"Actual: ${signatureToString(signature)} \n" + + s"Expected: ${signaturesToString(aggregateFunction, "accumulate")}") --- End diff -- The signature string includes the Accumulator, which should be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6334. Resolution: Fixed Fixed for 1.3 with c969237fce4fe5394e1cfdbb1186db6d73d0 > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995546#comment-15995546 ] ASF GitHub Bot commented on FLINK-6334: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3791 > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3791 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6440) Noisy logs from metric fetcher
Stephan Ewen created FLINK-6440: --- Summary: Noisy logs from metric fetcher Key: FLINK-6440 URL: https://issues.apache.org/jira/browse/FLINK-6440 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.3.0 Reporter: Stephan Ewen Priority: Critical Fix For: 1.3.0 In cases where TaskManagers fail, the web frontend in the Job Manager starts logging the exception below every few seconds. I labeled this as critical, because it actually makes debugging in such a situation complicated through a log that is flooded with noise. {code} 2017-05-03 19:37:07,823 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@herman:52175/user/MetricQueryService_136f717a6b91e248282cb2937d22088c]] after [1 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995253#comment-15995253 ] ASF GitHub Bot commented on FLINK-5906: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114602957 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Well, in the cases here, we are not really interested in the TypeInformation itself but the type class. So it would not really matter if you get Row.class from the `GenericTypeInfo` or the `RowTypeInfo`. Nonetheless, I think it is a good idea to have this optional method. It would also be consistent with the other user-defined functions. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114602957 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Well, in the cases here, we are not really interested in the TypeInformation itself but the type class. So it would not really matter if you get Row.class from the `GenericTypeInfo` or the `RowTypeInfo`. Nonetheless, I think it is a good idea to have this optional method. It would also be consistent with the other user-defined functions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995244#comment-15995244 ] ASF GitHub Bot commented on FLINK-6334: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114590134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -93,6 +103,11 @@ class Table( * }}} */ def select(fields: Expression*): Table = { +if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { --- End diff -- I think we can add these checks without touching all methods of `Table`. We could implement a method that recursively traverses a `LogicalNode` and checks if one of this children is an unbounded table function call. This check is performed in the constructor of Table and throws an exception except, the `logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it was created with the new constructor or `as()` was applied on it. That way we can remove all checks in the methods. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995241#comment-15995241 ] ASF GitHub Bot commented on FLINK-6334: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114558126 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +360,45 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* this method is used for create a [[LogicalTableFunctionCall]] +* @param tableEnv +* @param udtf a String represent a TableFunction Call e.g "split(c)" +* @return +*/ + def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) = { --- End diff -- Add return type to method > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995242#comment-15995242 ] ASF GitHub Bot commented on FLINK-6334: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114553215 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -49,11 +49,11 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} -import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} +import org.apache.flink.table.expressions.{Alias, Call, Expression, ExpressionParser, TableFunctionCall, UnresolvedFieldReference} --- End diff -- Remove the unnecessary imports > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995243#comment-15995243 ] ASF GitHub Bot commented on FLINK-6334: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114556553 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala --- @@ -35,6 +36,10 @@ class TableConversions(table: Table) { /** Converts the [[Table]] to a [[DataSet]] of the specified type. */ def toDataSet[T: TypeInformation]: DataSet[T] = { +if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) { --- End diff -- We should put this check into `Table.getRelNode`. This will catch all cases when we try to translate a table. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995245#comment-15995245 ] ASF GitHub Bot commented on FLINK-6334: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114590580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -417,13 +507,45 @@ class Table( } private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { -// check that right table belongs to the same TableEnvironment -if (right.tableEnv != this.tableEnv) { +if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { + throw new ValidationException( +"TableFunctions can only be followed by Alias. e.g table.join(split('c) as ('a, 'b))") +} + +// check that the TableEnvironment of right table is not null +// and right table belongs to the same TableEnvironment +if (right.tableEnv != null && right.tableEnv != this.tableEnv) { --- End diff -- I would change this method as follows: first we check if `right` is a table function call. If no, we translate the join as before. Otherwise, we translate it as a join with a table function. > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Blocker > Fix For: 1.3.0 > > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114556553 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala --- @@ -35,6 +36,10 @@ class TableConversions(table: Table) { /** Converts the [[Table]] to a [[DataSet]] of the specified type. */ def toDataSet[T: TypeInformation]: DataSet[T] = { +if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) { --- End diff -- We should put this check into `Table.getRelNode`. This will catch all cases when we try to translate a table. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114553215 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -49,11 +49,11 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} -import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} +import org.apache.flink.table.expressions.{Alias, Call, Expression, ExpressionParser, TableFunctionCall, UnresolvedFieldReference} --- End diff -- Remove the unnecessary imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114558126 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +360,45 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* this method is used for create a [[LogicalTableFunctionCall]] +* @param tableEnv +* @param udtf a String represent a TableFunction Call e.g "split(c)" +* @return +*/ + def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) = { --- End diff -- Add return type to method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114590580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -417,13 +507,45 @@ class Table( } private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { -// check that right table belongs to the same TableEnvironment -if (right.tableEnv != this.tableEnv) { +if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { + throw new ValidationException( +"TableFunctions can only be followed by Alias. e.g table.join(split('c) as ('a, 'b))") +} + +// check that the TableEnvironment of right table is not null +// and right table belongs to the same TableEnvironment +if (right.tableEnv != null && right.tableEnv != this.tableEnv) { --- End diff -- I would change this method as follows: first we check if `right` is a table function call. If no, we translate the join as before. Otherwise, we translate it as a join with a table function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114590134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -93,6 +103,11 @@ class Table( * }}} */ def select(fields: Expression*): Table = { +if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { --- End diff -- I think we can add these checks without touching all methods of `Table`. We could implement a method that recursively traverses a `LogicalNode` and checks if one of this children is an unbounded table function call. This check is performed in the constructor of Table and throws an exception except, the `logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it was created with the new constructor or `as()` was applied on it. That way we can remove all checks in the methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3793: flink-6033 Support UNNEST query in the stream SQL API
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3793 Thanks for the PR @suez1224. I think we need to rework this PR a little bit, before we can merge it. I saw that you had problems using basic type arrays such as `Integer[]`. I think we should cover this in a separate issue, because adding a type involves changes in multiple files and needs good tests. I would propose to introduce `LogicalUnnestRule` that convert the logical plan into a logical `FlinkLogicalCorrelate`. This approach would not require a `DataStreamCorrelateUnnestRule` and would work for both batch and streaming. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995194#comment-15995194 ] ASF GitHub Bot commented on FLINK-6013: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r114595396 --- Diff: flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(Enclosed.class) +public class DatadogHttpClientTest { + public static class TestApiKey { + @Test(expected = IllegalArgumentException.class) + public void testValidateApiKey() { + new DatadogHttpClient("fake_key"); --- End diff -- Make sense. There're lots of uncertainty why testing integration with 3rd party - you don't want to depend to much on it, but you have to test it somehow. It's hard to find a balance somewhere in between. I'll make changes as you recommended. > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r114595396 --- Diff: flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(Enclosed.class) +public class DatadogHttpClientTest { + public static class TestApiKey { + @Test(expected = IllegalArgumentException.class) + public void testValidateApiKey() { + new DatadogHttpClient("fake_key"); --- End diff -- Make sense. There're lots of uncertainty why testing integration with 3rd party - you don't want to depend to much on it, but you have to test it somehow. It's hard to find a balance somewhere in between. I'll make changes as you recommended. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995187#comment-15995187 ] ASF GitHub Bot commented on FLINK-5906: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114593800 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Yes, the Row type is a good example for type extraction problems. A user usually doesn't want to use `GenericType`. Other cases need for custom serializers or types with complex generics. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114593800 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Yes, the Row type is a good example for type extraction problems. A user usually doesn't want to use `GenericType`. Other cases need for custom serializers or types with complex generics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6439) Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()
Ted Yu created FLINK-6439: - Summary: Unclosed InputStream in OperatorSnapshotUtil#readStateHandle() Key: FLINK-6439 URL: https://issues.apache.org/jira/browse/FLINK-6439 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} FileInputStream in = new FileInputStream(path); DataInputStream dis = new DataInputStream(in); {code} None of the in / dis is closed upon return from the method. In writeStateHandle(), OutputStream should be closed in finally block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114580123 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- Ok, then we can also look at this again later. For now it is ok as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5720) Deprecate "Folding" in all of DataStream API
[ https://issues.apache.org/jira/browse/FLINK-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995134#comment-15995134 ] ASF GitHub Bot commented on FLINK-5720: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3816 [FLINK-5720] Deprecate DataStream#fold() This PR deprecates the various `DataStream#fold()` methods and all related internal classes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5720_deprecate_folding Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3816.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3816 commit 40fd483992f7820f8e6f7b26cd7e95a6d5f71507 Author: zentol Date: 2017-05-03T13:49:03Z [FLINK-5720] Deprecate DataStream#fold() > Deprecate "Folding" in all of DataStream API > > > Key: FLINK-5720 > URL: https://issues.apache.org/jira/browse/FLINK-5720 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.0 > > > Folding is an operation that cannot be done incrementally in a distributed > way and that also cannot be done on merging windows. Now that we have > {{AggregatingState}} and aggregate operations we should deprecate folding in > the APIs and deprecate {{FoldingState}}. > I suggest to remove folding completely in Flink 2.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3816: [FLINK-5720] Deprecate DataStream#fold()
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3816 [FLINK-5720] Deprecate DataStream#fold() This PR deprecates the various `DataStream#fold()` methods and all related internal classes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5720_deprecate_folding Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3816.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3816 commit 40fd483992f7820f8e6f7b26cd7e95a6d5f71507 Author: zentol Date: 2017-05-03T13:49:03Z [FLINK-5720] Deprecate DataStream#fold() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995105#comment-15995105 ] ASF GitHub Bot commented on FLINK-6364: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114579898 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard mis
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114581240 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- The idea would be to always check `getResultType()` first and only use the type extractor if the method is not implemented. So you would not need to enforce a TypeExtractor failure for the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3692 Alright, this makes sense. The common denominator is that every dns name will have an optional task placeholder (`_TASK`) and everything else is static, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5974) Support Mesos DNS
[ https://issues.apache.org/jira/browse/FLINK-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995123#comment-15995123 ] ASF GitHub Bot commented on FLINK-5974: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3692 Alright, this makes sense. The common denominator is that every dns name will have an optional task placeholder (`_TASK`) and everything else is static, right? > Support Mesos DNS > - > > Key: FLINK-5974 > URL: https://issues.apache.org/jira/browse/FLINK-5974 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > > In certain Mesos/DCOS environments, the slave hostnames aren't resolvable. > For this and other reasons, Mesos DNS names would ideally be used for > communication within the Flink cluster, not the hostname discovered via > `InetAddress.getLocalHost`. > Some parts of Flink are already configurable in this respect, notably > `jobmanager.rpc.address`. However, the Mesos AppMaster doesn't use that > setting for everything (e.g. artifact server), it uses the hostname. > Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment > mode. To effectively use Mesos DNS, the TM should use > `..mesos` as its hostname. This could be derived > from an interpolated configuration string. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5974) Support Mesos DNS
[ https://issues.apache.org/jira/browse/FLINK-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995099#comment-15995099 ] ASF GitHub Bot commented on FLINK-5974: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3692 @tillrohrmann Mesos DNS is not the only DNS solution for Mesos, it is merely the DCOS solution. By using an interpolated string, the name is fully configurable. > Support Mesos DNS > - > > Key: FLINK-5974 > URL: https://issues.apache.org/jira/browse/FLINK-5974 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > > In certain Mesos/DCOS environments, the slave hostnames aren't resolvable. > For this and other reasons, Mesos DNS names would ideally be used for > communication within the Flink cluster, not the hostname discovered via > `InetAddress.getLocalHost`. > Some parts of Flink are already configurable in this respect, notably > `jobmanager.rpc.address`. However, the Mesos AppMaster doesn't use that > setting for everything (e.g. artifact server), it uses the hostname. > Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment > mode. To effectively use Mesos DNS, the TM should use > `..mesos` as its hostname. This could be derived > from an interpolated configuration string. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995108#comment-15995108 ] ASF GitHub Bot commented on FLINK-6364: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114580123 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- Ok, then we can also look at this again later. For now it is ok as is. > Implement incremental checkpointing in RocksDBStateBackend > -- > > Key: FLINK-6364 > URL: https://issues.apache.org/jira/browse/FLINK-6364 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > {{RocksDBStateBackend}} is well suited for incremental checkpointing because > RocksDB is base on LSM trees, which record updates in new sst files and all > sst files are immutable. By only materializing those new sst files, we can > significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995115#comment-15995115 ] ASF GitHub Bot commented on FLINK-5906: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114581240 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- The idea would be to always check `getResultType()` first and only use the type extractor if the method is not implemented. So you would not need to enforce a TypeExtractor failure for the tests. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114579898 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file state.", e); + } + + if (!registered) { + for (String newSstFileName : newSstFileNames) { + StreamStateHandle handle = sstFiles.get(newSstFileName); +
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995101#comment-15995101 ] ASF GitHub Bot commented on FLINK-5906: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114579772 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/DSetUDAGGITCase.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table + +import java.math.BigDecimal + +import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv} +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset} +import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.mockito.Mockito.{mock, when} + +import scala.collection.JavaConverters._ + +/** + * We only test some aggregations until better testing of constructed DataSet + * programs is possible. + */ +@RunWith(classOf[Parameterized]) +class DSetUDAGGITCase(configMode: TableConfigMode) --- End diff -- Sounds good to me. I did not have the UDAGG design across all different aggregation tests, as I feel the current agg tests are a little mess up. It always takes me a while to find the right test cases among all different test files. I put UDAGG test cases into one file which helps me to easily understand what kinds of tests have been covered. I think we need to think about how to reorganize our agg test structure. Considering the short time to freeze feature, let us keep the current structure (I will split the UDAGG into all different agg test files) and massage the tests later. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114579772 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/DSetUDAGGITCase.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table + +import java.math.BigDecimal + +import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv} +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset} +import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.mockito.Mockito.{mock, when} + +import scala.collection.JavaConverters._ + +/** + * We only test some aggregations until better testing of constructed DataSet + * programs is possible. + */ +@RunWith(classOf[Parameterized]) +class DSetUDAGGITCase(configMode: TableConfigMode) --- End diff -- Sounds good to me. I did not have the UDAGG design across all different aggregation tests, as I feel the current agg tests are a little mess up. It always takes me a while to find the right test cases among all different test files. I put UDAGG test cases into one file which helps me to easily understand what kinds of tests have been covered. I think we need to think about how to reorganize our agg test structure. Considering the short time to freeze feature, let us keep the current structure (I will split the UDAGG into all different agg test files) and massage the tests later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3692 @tillrohrmann Mesos DNS is not the only DNS solution for Mesos, it is merely the DCOS solution. By using an interpolated string, the name is fully configurable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6217) ContaineredTaskManagerParameters sets off heap memory size incorrectly
[ https://issues.apache.org/jira/browse/FLINK-6217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995087#comment-15995087 ] ASF GitHub Bot commented on FLINK-6217: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3648 Thanks for your contribution @haohui and the review @StephanEwen. Changes look good to me. Merging this PR. > ContaineredTaskManagerParameters sets off heap memory size incorrectly > -- > > Key: FLINK-6217 > URL: https://issues.apache.org/jira/browse/FLINK-6217 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Haohui Mai >Assignee: Haohui Mai > > Thanks [~bill.liu8904] for triaging the issue. > When {{taskmanager.memory.off-heap}} is disabled, we observed that the total > memory that Flink allocates exceed the total memory of the container: > For a 8G container the JobManager starts the container with the following > parameter: > {noformat} > $JAVA_HOME/bin/java -Xms6072m -Xmx6072m -XX:MaxDirectMemorySize=6072m ... > {noformat} > The total amount of heap memory plus the off-heap memory exceeds the total > amount of memory of the container. As a result YARN occasionally kills the > container. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3648: [FLINK-6217] ContaineredTaskManagerParameters sets off-he...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3648 Thanks for your contribution @haohui and the review @StephanEwen. Changes look good to me. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995080#comment-15995080 ] ASF GitHub Bot commented on FLINK-5906: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114576600 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Thanks @twalthr . If I understand you correctly, you suggest to create a contract method `getResultType` for UDAGG, such that user can provide the result type in case the type extraction fails. Sounds good to me? Can you give some examples that when the type extraction will fail (for instance a Row type?) and why it may fail, such that I can add some test cases. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114576600 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Thanks @twalthr . If I understand you correctly, you suggest to create a contract method `getResultType` for UDAGG, such that user can provide the result type in case the type extraction fails. Sounds good to me? Can you give some examples that when the type extraction will fail (for instance a Row type?) and why it may fail, such that I can add some test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5718) Handle JVM Fatal Exceptions in Tasks
[ https://issues.apache.org/jira/browse/FLINK-5718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995049#comment-15995049 ] ASF GitHub Bot commented on FLINK-5718: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3811 Thanks for your contribution @zimmermatt. Changes look good to me. Merging this PR. > Handle JVM Fatal Exceptions in Tasks > > > Key: FLINK-5718 > URL: https://issues.apache.org/jira/browse/FLINK-5718 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The TaskManager catches and handles all types of exceptions right now (all > {{Throwables}}). The intention behind that is: > - Many {{Error}} subclasses are recoverable for the TaskManagers, such as > failure to load/link user code > - We want to give eager notifications to the JobManager in case something > in a task goes wrong. > However, there are some exceptions which should probably simply terminate the > JVM, if caught in the task thread, because they may leave the JVM in a > dysfunctional limbo state: > - {{OutOfMemoryError}} > - {{InternalError}} > - {{UnknownError}} > - {{ZipError}} > These are basically the subclasses of {{VirtualMachineError}}, except for > {{StackOverflowError}}, which is recoverable and usually recovered already by > the time the exception has been thrown and the stack unwound. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3811: [FLINK-5718] [core] TaskManagers exit the JVM on fatal ex...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3811 Thanks for your contribution @zimmermatt. Changes look good to me. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995044#comment-15995044 ] ASF GitHub Bot commented on FLINK-5906: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114571910 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -327,4 +332,56 @@ object ProjectionTranslator { } } + /** +* Find and replace UDAGG function Call to UDAGGFunctionCall +* +* @param fieldthe expression to check +* @param tableEnv the TableEnvironment +* @return an expression with correct UDAGGFunctionCall type for UDAGG functions +*/ + def replaceUDAGGFunctionCall(field: Expression, tableEnv: TableEnvironment): Expression = { --- End diff -- We will not have the chance to execute LogicalNode#resolveExpressions before get aggNames, projectFields, etc. I actually tried another alternative approach to conduct the replacement in extractAggregationsAndProperties and replaceAggregationsAndProperties (we have to check and handle the UDAGG call carefully in both functions), it works but I do not like that design. It makes the logic of these two methods not completely clean. Also, in over aggregate it will not call extractAggregationsAndProperties and replaceAggregationsAndProperties. So I decide to implement a separate function to handle the UDAGGFunctionCall replacement. > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114571910 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -327,4 +332,56 @@ object ProjectionTranslator { } } + /** +* Find and replace UDAGG function Call to UDAGGFunctionCall +* +* @param fieldthe expression to check +* @param tableEnv the TableEnvironment +* @return an expression with correct UDAGGFunctionCall type for UDAGG functions +*/ + def replaceUDAGGFunctionCall(field: Expression, tableEnv: TableEnvironment): Expression = { --- End diff -- We will not have the chance to execute LogicalNode#resolveExpressions before get aggNames, projectFields, etc. I actually tried another alternative approach to conduct the replacement in extractAggregationsAndProperties and replaceAggregationsAndProperties (we have to check and handle the UDAGG call carefully in both functions), it works but I do not like that design. It makes the logic of these two methods not completely clean. Also, in over aggregate it will not call extractAggregationsAndProperties and replaceAggregationsAndProperties. So I decide to implement a separate function to handle the UDAGGFunctionCall replacement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3763: [FLINK-6372][scripts] Fix change scala version of ...
Github user soniclavier closed the pull request at: https://github.com/apache/flink/pull/3763 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6372) change-scala-version.sh does not change version for flink-gelly-examples
[ https://issues.apache.org/jira/browse/FLINK-6372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995011#comment-15995011 ] ASF GitHub Bot commented on FLINK-6372: --- Github user soniclavier closed the pull request at: https://github.com/apache/flink/pull/3763 > change-scala-version.sh does not change version for flink-gelly-examples > > > Key: FLINK-6372 > URL: https://issues.apache.org/jira/browse/FLINK-6372 > Project: Flink > Issue Type: Bug >Reporter: vishnu viswanath >Assignee: vishnu viswanath > > change-scala-version.sh does not change the version for flink-gelly-examples > in bin.xml. This is causing build to fail if using scala 2.11, since its > looking for flink-gelly-examples_2.10 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995008#comment-15995008 ] ASF GitHub Bot commented on FLINK-6364: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565991 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- I attempted to put the restore state in the constructor as we discussed. But it turns out impossible. All state backends should be registered in the task so that the backends can be closed when the task is canceled. If we put the restoring in the constructor of the backends, the construction of the backends may be blocked (e.g., due to the access to HDFS). Since the construction is not completed yet, the backend will not be registered and hence will not be closed. > Implement incremental checkpointing in RocksDBStateBackend > -- > > Key: FLINK-6364 > URL: https://issues.apache.org/jira/browse/FLINK-6364 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > {{RocksDBStateBackend}} is well suited for incremental checkpointing because > RocksDB is base on LSM trees, which record updates in new sst files and all > sst files are immutable. By only materializing those new sst files, we can > significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565991 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -769,9 +769,10 @@ public OperatorStateBackend createOperatorStateBackend( cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); --- End diff -- I attempted to put the restore state in the constructor as we discussed. But it turns out impossible. All state backends should be registered in the task so that the backends can be closed when the task is canceled. If we put the restoring in the constructor of the backends, the construction of the backends may be blocked (e.g., due to the access to HDFS). Since the construction is not completed yet, the backend will not be registered and hence will not be closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995007#comment-15995007 ] ASF GitHub Bot commented on FLINK-6364: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565968 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc f
[GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114565968 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateHandle.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend} + */ +public class RocksDBKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final Set newSstFileNames; + + private final Map sstFiles; + + private final Map miscFiles; + + private final StreamStateHandle metaStateHandle; + + private boolean registered; + + RocksDBKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + Set newSstFileNames, + Map sstFiles, + Map miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = jobId; + this.operatorIdentifier = operatorIdentifier; + this.keyGroupRange = keyGroupRange; + this.newSstFileNames = newSstFileNames; + this.sstFiles = sstFiles; + this.miscFiles = miscFiles; + this.metaStateHandle = metaStateHandle; + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public Map getSstFiles() { + return sstFiles; + } + + public Map getMiscFiles() { + return miscFiles; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file state.", e); + } + + if (!registered) { + for (String newSstFileName : newSstFileNames) { + StreamStateHandle handle = sstFiles.get(newSstFileName); +
[jira] [Commented] (FLINK-5974) Support Mesos DNS
[ https://issues.apache.org/jira/browse/FLINK-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995000#comment-15995000 ] ASF GitHub Bot commented on FLINK-5974: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3692 Why do we actually make `TaskManager` hostname configurable via `mesos.resourcemanager.tasks.hostname`? Isn't the mesos-dns hostname already uniquely defined by the `TaskID` and the framework name contained in `FrameworkInfo`? If the `FrameworkInfo` does not contain the service name, then we should only specify this information. Then we could rename the configuration parameter into `mesos.service-name` and concatenate `taskId`, `service-name` and `mesos`, where `taskId` is retrieved from the `TaskID` instance. > Support Mesos DNS > - > > Key: FLINK-5974 > URL: https://issues.apache.org/jira/browse/FLINK-5974 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > > In certain Mesos/DCOS environments, the slave hostnames aren't resolvable. > For this and other reasons, Mesos DNS names would ideally be used for > communication within the Flink cluster, not the hostname discovered via > `InetAddress.getLocalHost`. > Some parts of Flink are already configurable in this respect, notably > `jobmanager.rpc.address`. However, the Mesos AppMaster doesn't use that > setting for everything (e.g. artifact server), it uses the hostname. > Similarly, the `taskmanager.hostname` setting isn't used in Mesos deployment > mode. To effectively use Mesos DNS, the TM should use > `..mesos` as its hostname. This could be derived > from an interpolated configuration string. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3692 Why do we actually make `TaskManager` hostname configurable via `mesos.resourcemanager.tasks.hostname`? Isn't the mesos-dns hostname already uniquely defined by the `TaskID` and the framework name contained in `FrameworkInfo`? If the `FrameworkInfo` does not contain the service name, then we should only specify this information. Then we could rename the configuration parameter into `mesos.service-name` and concatenate `taskId`, `service-name` and `mesos`, where `taskId` is retrieved from the `TaskID` instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3802: Add Evenly Graph Generator to Flink Gelly
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3802 What makes this graph 'evenly'? See `org.apache.flink.graph.drivers.input.CompleteGraph` for creating an input for `Runner` (the default class executed when doing a `/bin/flink run` on the flink-gelly-examples jar; see [usage](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/gelly/index.html)). Also need to update [`graph_generators.md`](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/gelly/graph_generators.html). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6438) Expand docs home page a little
[ https://issues.apache.org/jira/browse/FLINK-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Anderson reassigned FLINK-6438: - Assignee: David Anderson > Expand docs home page a little > -- > > Key: FLINK-6438 > URL: https://issues.apache.org/jira/browse/FLINK-6438 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson >Priority: Minor > > The idea is to improve the documentation home page by adding a few links to > valuable items that are too easily overlooked. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6438) Expand docs home page a little
David Anderson created FLINK-6438: - Summary: Expand docs home page a little Key: FLINK-6438 URL: https://issues.apache.org/jira/browse/FLINK-6438 Project: Flink Issue Type: Improvement Components: Documentation Reporter: David Anderson Priority: Minor The idea is to improve the documentation home page by adding a few links to valuable items that are too easily overlooked. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994987#comment-15994987 ] ASF GitHub Bot commented on FLINK-5969: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3778 @zentol Thanks for reviewing! 😃 I merged this also on master, so that we have backwards compatibility tests from 1.2 to the current master, which was the reason for this whole exercise. > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.2 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3778: [FLINK-5969] Add savepoint backwards compatibility tests ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3778 @zentol Thanks for reviewing! ð I merged this also on master, so that we have backwards compatibility tests from 1.2 to the current master, which was the reason for this whole exercise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-5969. --- Resolution: Fixed Fix Version/s: 1.2.2 Implemented on release-1.2 in: - 852a710b4d91bdd319238c87d227c51a904070a7 - 53432e0690510aa5838165d4958777d872668be4 - c9ba4f0313346d96db6258b533041da180f5471e - c89d4b43213d7d3e2ad184787061fe3178be5691 - c43fc2a14fa9ece5f3a80c8f25e6d99a2450f671 - 62601b4978582f05e5fadec83ce18eeec5842686 - 611434c6fa8e53cac25dd93f568d2670ec4ead72 - 52fb578953fb571f5787c8f69fe2ac4525488a3d - a3ccffcb0a1b3e936fff31a61dddf1cada8ab99b - f68f6542bb2ba0171883d27c9ac75c74301ddf88 Implemented on master in: - 821ec80d72308746aed307498be157b58b7b65e9 - fb7793f033cfa0d6d77ef25a6c518a5a203ebb82 - 2c6377f257cafcbaea3a5bf33dd27852ed05afec - 84eea72295eda5e7289deb5221c7b990b7b65883 - e40f2e184c6e57d4346312f969a64727389e92fa - 0ecb5d0050b84ba48105836288d43ce4c4749459 - 44e472f6cffd75213c3b7373bd622b734f89f0b0 - 2779197f237446e3bff4e9e15f90c24d721c8ab4 - 9ed98f2e5a32fb14de03b9a8ea1dd45851cc3a7e - 1882c90505b0d25775b969cc025c8a3087b82f37 - 6f8b3c6158a87c14f2fdb3446092df367131e194 - cced07a28622016ca1ee2d5b316423701c9a986c > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.2 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994974#comment-15994974 ] ASF GitHub Bot commented on FLINK-5969: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3778 Manually merged > Add savepoint backwards compatibility tests from 1.2 to 1.3 > --- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)