[jira] [Commented] (FLINK-7934) Upgrade Calcite dependency to 1.15
[ https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340668#comment-16340668 ] ASF GitHub Bot commented on FLINK-7934: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5355 @bowenli86 The following two features are related to this PR. [CALCITE-2016] Make item + dot operators work for array (e.g. SELECT orders[5].color FROM t (Shuyi Chen) [CALCITE-1867] Allow user-defined grouped window functions (Timo Walther) Also, Calcite 1.15 added built-in support for simple DDL statements like CREATE and DROP. > Upgrade Calcite dependency to 1.15 > -- > > Key: FLINK-7934 > URL: https://issues.apache.org/jira/browse/FLINK-7934 > Project: Flink > Issue Type: Task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Umbrella issue for all related issues for Apache Calcite 1.15 release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5355: [FLINK-7934][Table & SQL API] Upgrade Flink to use Calcit...
Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5355 @bowenli86 The following two features are related to this PR. [CALCITE-2016] Make item + dot operators work for array (e.g. SELECT orders[5].color FROM t (Shuyi Chen) [CALCITE-1867] Allow user-defined grouped window functions (Timo Walther) Also, Calcite 1.15 added built-in support for simple DDL statements like CREATE and DROP. ---
[jira] [Commented] (FLINK-8514) move flink-connector-wikiedits to Apache Bahir
[ https://issues.apache.org/jira/browse/FLINK-8514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340667#comment-16340667 ] Ufuk Celebi commented on FLINK-8514: +1 > move flink-connector-wikiedits to Apache Bahir > -- > > Key: FLINK-8514 > URL: https://issues.apache.org/jira/browse/FLINK-8514 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > > I propose moving flink-connector-wikiedits to Apache Bahir given its low > popularity. We can probably email the community to see if anyone actually > still uses it -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340658#comment-16340658 ] ASF GitHub Bot commented on FLINK-7923: --- GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/5367 [FLINK-7923][Table API & SQL] Support field access of composite array element in SQL Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved. ## What is the purpose of the change Support field access of composite array element in SQL. ## Brief change log - add handling to calcite dot operator to support field access of composite array element in SQL - add unittests to verify that it works for tuple array, row array, pojo array and case class array ## Verifying this change This change added tests and can be verified as follows: - *Added unittests to verify the query plan* - *Added integration tests for batch/streaming for pojo/case class/tuple/row type* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink flink-7923 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5367 commit 9e915e4144703582843b0f31bffc1481648d0119 Author: Shuyi ChenDate: 2018-01-10T00:52:56Z Upgrade to Calcite 1.15 commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08 Author: Shuyi Chen Date: 2018-01-25T23:36:36Z Support access of subfields of Array element if the element is a composite type (e.g. case class, pojo, tuple or row). > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/5367 [FLINK-7923][Table API & SQL] Support field access of composite array element in SQL Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved. ## What is the purpose of the change Support field access of composite array element in SQL. ## Brief change log - add handling to calcite dot operator to support field access of composite array element in SQL - add unittests to verify that it works for tuple array, row array, pojo array and case class array ## Verifying this change This change added tests and can be verified as follows: - *Added unittests to verify the query plan* - *Added integration tests for batch/streaming for pojo/case class/tuple/row type* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink flink-7923 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5367 commit 9e915e4144703582843b0f31bffc1481648d0119 Author: Shuyi ChenDate: 2018-01-10T00:52:56Z Upgrade to Calcite 1.15 commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08 Author: Shuyi Chen Date: 2018-01-25T23:36:36Z Support access of subfields of Array element if the element is a composite type (e.g. case class, pojo, tuple or row). ---
[jira] [Commented] (FLINK-6623) BlobCacheSuccessTest fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340637#comment-16340637 ] ASF GitHub Bot commented on FLINK-6623: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5351 Cool, do you want to commit both changes (this PR and the referenced branch) together? > BlobCacheSuccessTest fails on Windows > - > > Key: FLINK-6623 > URL: https://issues.apache.org/jira/browse/FLINK-6623 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0, 1.5.0 > Environment: windows 10, java 1.8 >Reporter: constantine stanley >Assignee: Chesnay Schepler >Priority: Major > > All tests in {{BlobCacheSuccessTest}} fail on Windows. > {code} > java.nio.file.FileSystemException: > C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp- > -> > C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) > at > sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) > at java.nio.file.Files.move(Files.java:1395) > at > org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464) > at > org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708) > at > org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608) > at > org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568) > at > org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778) > at > org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173) > at > org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5351: [FLINK-6623][Blob] BlobServer#putBuffer moves file after ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5351 Cool, do you want to commit both changes (this PR and the referenced branch) together? ---
[GitHub] flink issue #5338: [FLINK-8476][config][HA] Deprecate HA config constants
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5338 Thanks, good change, +1 ---
[jira] [Commented] (FLINK-8476) ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused
[ https://issues.apache.org/jira/browse/FLINK-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340631#comment-16340631 ] ASF GitHub Bot commented on FLINK-8476: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5338 Thanks, good change, +1 > ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused > -- > > Key: FLINK-8476 > URL: https://issues.apache.org/jira/browse/FLINK-8476 > Project: Flink > Issue Type: Improvement > Components: Configuration >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > {{ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT}} is unused and should probably > be deprecated. > [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5366: [hotfix] improve javadoc and logging of RocksDBKey...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5366 [hotfix] improve javadoc and logging of RocksDBKeyedStateBackend ## What is the purpose of the change General improvements on javadoc and logging of RocksDBKeyedStateBackend ## Brief change log - updated and fixed a few javadoc errors - improved logging ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5366.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5366 commit 649da439c9155f780021f7db3637e10823ec5b21 Author: Bowen LiDate: 2018-01-26T04:59:38Z [hotfix] improve javadoc and logging of RocksDBKeyedStateBackend ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340564#comment-16340564 ] Thomas Weise commented on FLINK-8516: - Relevant piece of code: [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L594] {code:java} public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard, int totalNumberOfConsumerSubtasks, int indexOfThisConsumerSubtask) { return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask; }{code} > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value
[ https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340559#comment-16340559 ] ASF GitHub Bot commented on FLINK-8364: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5356 cc @fhueske @aljoscha > Add iterator() to ListState which returns empty iterator when it has no value > - > > Key: FLINK-8364 > URL: https://issues.apache.org/jira/browse/FLINK-8364 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Add iterator() to ListState which returns empty iterator when it has no value -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5356 cc @fhueske @aljoscha ---
[jira] [Commented] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()
[ https://issues.apache.org/jira/browse/FLINK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340557#comment-16340557 ] ASF GitHub Bot commented on FLINK-8515: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5365 [FLINK-8515] update RocksDBMapState to replace deprecated remove() with delete() ## What is the purpose of the change RocksDBMapState is currently using `rocksdb#remove()` which is deprecated. Should be replaced with `rocksdb#delete()` ## Brief change log update RocksDBMapState to replace deprecated remove() with delete() ## Verifying this change This change is already covered by existing tests, such as `StateBackendTestBase#testMapState()` ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5365.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5365 commit 8cd47d5e6110e29e4c6c36aea08543048010b9aa Author: Bowen LiDate: 2018-01-26T04:25:00Z [FLINK-8515] update RocksDBMapState to replace deprecated remove() with delete() > update RocksDBMapState to replace deprecated remove() with delete() > --- > > Key: FLINK-8515 > URL: https://issues.apache.org/jira/browse/FLINK-8515 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.5.0 > > > Currently in RocksDBMapState: > {code:java} > @Override > public void remove(UK userKey) throws IOException, RocksDBException { > byte[] rawKeyBytes = > serializeUserKeyWithCurrentKeyAndNamespace(userKey); > backend.db.remove(columnFamily, writeOptions, rawKeyBytes); > } > {code} > remove() is actually deprecated. Should be replaced with delete() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5365: [FLINK-8515] update RocksDBMapState to replace dep...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5365 [FLINK-8515] update RocksDBMapState to replace deprecated remove() with delete() ## What is the purpose of the change RocksDBMapState is currently using `rocksdb#remove()` which is deprecated. Should be replaced with `rocksdb#delete()` ## Brief change log update RocksDBMapState to replace deprecated remove() with delete() ## Verifying this change This change is already covered by existing tests, such as `StateBackendTestBase#testMapState()` ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5365.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5365 commit 8cd47d5e6110e29e4c6c36aea08543048010b9aa Author: Bowen LiDate: 2018-01-26T04:25:00Z [FLINK-8515] update RocksDBMapState to replace deprecated remove() with delete() ---
[jira] [Comment Edited] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16304163#comment-16304163 ] Ted Yu edited comment on FLINK-4534 at 1/26/18 4:27 AM: Can this get more review ? Thanks was (Author: yuzhih...@gmail.com): lgtm > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entryentry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281 ] Ted Yu edited comment on FLINK-6105 at 1/26/18 4:26 AM: In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored. was (Author: yuzhih...@gmail.com): In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
Thomas Weise created FLINK-8516: --- Summary: FlinkKinesisConsumer does not balance shards over subtasks Key: FLINK-8516 URL: https://issues.apache.org/jira/browse/FLINK-8516 Project: Flink Issue Type: Bug Components: Kinesis Connector Reporter: Thomas Weise The hash code of the shard is used to distribute discovered shards over subtasks round robin. This works as long as shard identifiers are sequential. After shards are rebalanced in Kinesis, that may no longer be the case and the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8515) update RocksDBMapState to replace deprecated remove() with delete()
Bowen Li created FLINK-8515: --- Summary: update RocksDBMapState to replace deprecated remove() with delete() Key: FLINK-8515 URL: https://issues.apache.org/jira/browse/FLINK-8515 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.5.0 Currently in RocksDBMapState: {code:java} @Override public void remove(UK userKey) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); backend.db.remove(columnFamily, writeOptions, rawKeyBytes); } {code} remove() is actually deprecated. Should be replaced with delete() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8514) move flink-connector-wikiedits to Apache Bahir
Bowen Li created FLINK-8514: --- Summary: move flink-connector-wikiedits to Apache Bahir Key: FLINK-8514 URL: https://issues.apache.org/jira/browse/FLINK-8514 Project: Flink Issue Type: Sub-task Reporter: Bowen Li Assignee: Bowen Li I propose moving flink-connector-wikiedits to Apache Bahir given its low popularity. We can probably email the community to see if anyone actually still uses it -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8448) [flink-mesos] The flink-job jars uploaded in directory 'web.upload.dir' are deleted on flink-scheduler restart
[ https://issues.apache.org/jira/browse/FLINK-8448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340524#comment-16340524 ] Chesnay Schepler commented on FLINK-8448: - I'm not really into the specifics of flink-mesos, but the directory where jars are stored is cleaned up and there's no mechanism to retain uploaded jars. I did think about this behavior configurable; that would allow people to use a directory as a kind of repository for jars to use. There may be another open issue for this, let's see if i can find it... > [flink-mesos] The flink-job jars uploaded in directory 'web.upload.dir' are > deleted on flink-scheduler restart > -- > > Key: FLINK-8448 > URL: https://issues.apache.org/jira/browse/FLINK-8448 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 > Environment: Flink 1.3 version > >Reporter: Bhumika Bayani >Priority: Major > > Whatever flink-job-jars are uploaded from flink-jobmanager UI, we lose them > when flink-mesos-scheduler restart happens > does flink have any mechanism to retain them? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340519#comment-16340519 ] Chesnay Schepler commented on FLINK-8189: - I'm tempted to close this as "Won't fix"/"Duplicate" due to the discussion in FLINK-4602. > move flink-statebackend-rocksdb out of flink-contrib > > > Key: FLINK-8189 > URL: https://issues.apache.org/jira/browse/FLINK-8189 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > > Move {{flink-statebackend-rocksdb}} into probably its own/state-backend > module or {{flink-runtime}} package. > A few reasons: > * RocksDB state backend has been evolving into the standard state backends > because it can handle much larger state compared to HeapStateBackend. It > would be better to put it into /lib package in Flink somehow by default > * {{flink-contrib}} doesn't show the correct package hierarchy -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340516#comment-16340516 ] Bowen Li commented on FLINK-3089: - So TtlDB can only support one TTL in one db opening. If you close Ttldb and reopen it, you can specify another TTL. I don't think frequently opening and closing db is a good idea. Thus, maybe the 3rd important assumption we have to make is that all states that have an expiry must share the same TTL. > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li >Priority: Major > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8189: Description: Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module or {{flink-runtime}} package. A few reasons: * RocksDB state backend has been evolving into the standard state backends because it can handle much larger state compared to HeapStateBackend. It would be better to put it into /lib package in Flink somehow by default * {{flink-contrib}} doesn't show the correct package hierarchy was: Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module or {{flink-runtime}} package. A few reasons: * RocksDB state backend has been evolving into the standard state backends because it can handle much larger state compared to HeapStateBackend. It would be better to put it into /lib package in Flink somehow by default * {{flink-contrib}} doesn't show the correct package hierachy of > move flink-statebackend-rocksdb out of flink-contrib > > > Key: FLINK-8189 > URL: https://issues.apache.org/jira/browse/FLINK-8189 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > > Move {{flink-statebackend-rocksdb}} into probably its own/state-backend > module or {{flink-runtime}} package. > A few reasons: > * RocksDB state backend has been evolving into the standard state backends > because it can handle much larger state compared to HeapStateBackend. It > would be better to put it into /lib package in Flink somehow by default > * {{flink-contrib}} doesn't show the correct package hierarchy -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8189: Description: Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module or {{flink-runtime}} package. A few reasons: * RocksDB state backend has been evolving into the standard state backends because it can handle much larger state compared to HeapStateBackend. It would be better to put it into /lib package in Flink somehow by default * {{flink-contrib}} doesn't show the correct package hierachy of was:Move {{flink-statebackend-rocksdb}} into probably its own/state-backend module or {{flink-runtime}} package. > move flink-statebackend-rocksdb out of flink-contrib > > > Key: FLINK-8189 > URL: https://issues.apache.org/jira/browse/FLINK-8189 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > > Move {{flink-statebackend-rocksdb}} into probably its own/state-backend > module or {{flink-runtime}} package. > A few reasons: > * RocksDB state backend has been evolving into the standard state backends > because it can handle much larger state compared to HeapStateBackend. It > would be better to put it into /lib package in Flink somehow by default > * {{flink-contrib}} doesn't show the correct package hierachy of -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8189) move flink-statebackend-rocksdb out of flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-8189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340436#comment-16340436 ] Bowen Li commented on FLINK-8189: - [~till.rohrmann] [~aljoscha] [~srichter] Hi guys, I'm thinking of start this task since we've finished removing {{flink-contrib/flink-streaming-contrib}}. I'm leaning towards moving {{flink-statebackend-rocksdb}} into {{flink-runtime}} so that it can sit with {{HeapStateBackend}}. What do you think? > move flink-statebackend-rocksdb out of flink-contrib > > > Key: FLINK-8189 > URL: https://issues.apache.org/jira/browse/FLINK-8189 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > > Move {{flink-statebackend-rocksdb}} into probably its own/state-backend > module or {{flink-runtime}} package. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos
[ https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340240#comment-16340240 ] ASF GitHub Bot commented on FLINK-8490: --- Github user joerg84 commented on the issue: https://github.com/apache/flink/pull/5346 @tillrohrmann Could you take a final look? > Allow custom docker parameters for docker tasks on Mesos > > > Key: FLINK-8490 > URL: https://issues.apache.org/jira/browse/FLINK-8490 > Project: Flink > Issue Type: Improvement > Components: Mesos >Reporter: Jörg Schad >Priority: Major > > It would be great to pass custom parameters to Mesos when using the Docker > Containerizer. > This could be similar to this spark example: > `spark.mesos.executor.docker.parameters privileged=true` > > Originally brought up here: > https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5346: [FLINK-8490] [mesos] Allow custom docker parameters for d...
Github user joerg84 commented on the issue: https://github.com/apache/flink/pull/5346 @tillrohrmann Could you take a final look? ---
[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client
[ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16340113#comment-16340113 ] ASF GitHub Bot commented on FLINK-7386: --- Github user cjolif commented on the issue: https://github.com/apache/flink/pull/4675 FYI I rebased it and got working results on a sample of mine. > Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ > client > > > Key: FLINK-7386 > URL: https://issues.apache.org/jira/browse/FLINK-7386 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector >Reporter: Dawid Wysakowicz >Assignee: Fang Yong >Priority: Critical > Fix For: 1.5.0 > > > In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and > has no longer the method {{add(ActionRequest)}}. > For more info see: https://github.com/elastic/elasticsearch/pull/20109 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...
Github user cjolif commented on the issue: https://github.com/apache/flink/pull/4675 FYI I rebased it and got working results on a sample of mine. ---
[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key
[ https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339853#comment-16339853 ] ASF GitHub Bot commented on FLINK-8267: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5301 cc @tzulitai > update Kinesis Producer example for setting Region key > -- > > Key: FLINK-8267 > URL: https://issues.apache.org/jira/browse/FLINK-8267 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Dyana Rose >Assignee: Bowen Li >Priority: Minor > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer > In the example code for the kinesis producer the region key is set like: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > {code} > However, the AWS Kinesis Producer Library requires that the region key be > Region > (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269) > so the setting at this point should be: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > producerConfig.put("Region", "us-east-1"); > {code} > When you run the Kinesis Producer you can see the effect of not setting the > Region key by a log line > {noformat} > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Started > Kinesis producer instance for region '' > {noformat} > The KPL also then assumes it's running on EC2 and attempts to determine it's > own region, which fails. > {noformat} > (EC2MetadataClient)Http request to Ec2MetadataService failed. > [error] [main.cc:266] Could not configure the region. It was not given in the > config and we were unable to retrieve it from EC2 metadata > {noformat} > At the least I'd say the documentation should mention the difference between > these two keys and when you are required to also set the Region key. > On the other hand, is this even the intended behaviour of this connector? Was > it intended that the AWSConfigConstants.AWS_REGION key also set the region of > the of the kinesis stream? The documentation for the example states > {noformat} > The example demonstrates producing a single Kinesis stream in the AWS region > “us-east-1”. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Producer ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5301 cc @tzulitai ---
[GitHub] flink pull request #5149: [FLINK-7858][flp6] Port JobVertexTaskManagersHandl...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5149 ---
[jira] [Resolved] (FLINK-8224) Should shudownApplication when job terminated in job mode
[ https://issues.apache.org/jira/browse/FLINK-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8224. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via a4ecc7ffe4ba16a68de06c1053c7916e6082b413 > Should shudownApplication when job terminated in job mode > - > > Key: FLINK-8224 > URL: https://issues.apache.org/jira/browse/FLINK-8224 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > For job mode, one job is an application. When job finished, it should tell > the resource manager to shutdown the application, otherwise the resource > manager can not set the application status. For example, if yarn resource > manager don't set application as finished to yarn master, the yarn will > consider the application as still running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7858) Port JobVertexTaskManagersHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339808#comment-16339808 ] ASF GitHub Bot commented on FLINK-7858: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5149 > Port JobVertexTaskManagersHandler to REST endpoint > -- > > Key: FLINK-7858 > URL: https://issues.apache.org/jira/browse/FLINK-7858 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Fix For: 1.5.0 > > > Port JobVertexTaskManagersHandler to REST endpoint -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5170: [FLINK-8266] [runtime] add network memory to Resou...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5170 ---
[jira] [Resolved] (FLINK-7858) Port JobVertexTaskManagersHandler to REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7858. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via 056c72af994bc0b7bd838faff6b2991763fc2ac1 37b4e2cef687160f2bc7cedb7d2360825089569e > Port JobVertexTaskManagersHandler to REST endpoint > -- > > Key: FLINK-7858 > URL: https://issues.apache.org/jira/browse/FLINK-7858 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Fix For: 1.5.0 > > > Port JobVertexTaskManagersHandler to REST endpoint -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8224) Should shudownApplication when job terminated in job mode
[ https://issues.apache.org/jira/browse/FLINK-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339806#comment-16339806 ] ASF GitHub Bot commented on FLINK-8224: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5139 > Should shudownApplication when job terminated in job mode > - > > Key: FLINK-8224 > URL: https://issues.apache.org/jira/browse/FLINK-8224 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > For job mode, one job is an application. When job finished, it should tell > the resource manager to shutdown the application, otherwise the resource > manager can not set the application status. For example, if yarn resource > manager don't set application as finished to yarn master, the yarn will > consider the application as still running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8266) Add network memory to ResourceProfile
[ https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8266. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via 47e6069d7a299c02a81f062a7acb6a792b71c146 > Add network memory to ResourceProfile > - > > Key: FLINK-8266 > URL: https://issues.apache.org/jira/browse/FLINK-8266 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > In task manager side, it should allocated the network buffer pool according > to the input channel and output sub partition number, but when allocating a > worker, the resource profile doesn't contain the information about these > memory. > So I suggest add a network memory filed to ResourceProfile and job master > should calculate it when scheduling a task and then resource manager can > allocating a container with the resource profile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8266) Add network memory to ResourceProfile
[ https://issues.apache.org/jira/browse/FLINK-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339807#comment-16339807 ] ASF GitHub Bot commented on FLINK-8266: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5170 > Add network memory to ResourceProfile > - > > Key: FLINK-8266 > URL: https://issues.apache.org/jira/browse/FLINK-8266 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > In task manager side, it should allocated the network buffer pool according > to the input channel and output sub partition number, but when allocating a > worker, the resource profile doesn't contain the information about these > memory. > So I suggest add a network memory filed to ResourceProfile and job master > should calculate it when scheduling a task and then resource manager can > allocating a container with the resource profile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5139: [FLINK-8224] [runtime] shutdown application when j...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5139 ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163938277 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo{ + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer keySerializer, + final TypeSerializer valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + public TypeSerializer getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result + keySerializer.hashCode(); + result = 31 * result + valueSerializer.hashCode(); + return result; + } + + @Override + public String toString() { + return
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163933180 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + int numBroadcastStates = in.readShort(); --- End diff -- This here (and onwards) would fail if we're reading older version savepoints, because there was nothing written for this before. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163936390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -36,8 +36,9 @@ * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. */ public enum Mode { - SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. - BROADCAST // The operator state partitions are broadcast to all task. + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + BROADCAST, // The operator state partitions are broadcasted to all tasks. + UNIFORM_BROADCAST // The operator states are identical, and they are broadcasted to all tasks. --- End diff -- nit: can we either keep with spaces here, or at least tab them so that the 3 comments are aligned? ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163938102 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo{ + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer keySerializer, + final TypeSerializer valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + public TypeSerializer getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result + keySerializer.hashCode(); + result = 31 * result + valueSerializer.hashCode(); + return result; + } + + @Override + public String toString() { + return
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163935065 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + int numBroadcastStates = in.readShort(); --- End diff -- One straightforward way to fix this is, is to uptick the current `VERSION` to 3, and here you do: ``` if (getReadVersion() >= 3) { // read broadcast state stuff } ``` so we only try to read broadcast state stuff if the written version in the savepoint is larger or equal to 3. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163935632 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -211,4 +297,34 @@ public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) { return stateMetaInfo; } } + + public static class BroadcastStateMetaInfoReaderV2extends AbstractBroadcastStateMetaInfoReader { + + public BroadcastStateMetaInfoReaderV2(final ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredBroadcastBackendStateMetaInfo.Snapshot readBroadcastStateMetaInfo(final DataInputView in) throws IOException { + RegisteredBroadcastBackendStateMetaInfo.Snapshot stateMetaInfo = + new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(); + + stateMetaInfo.setName(in.readUTF()); + stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); + + Tuple2 keySerializerAndConfig = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0); --- End diff -- If the `writeSerializersAndConfigsWithResilience` call was a single one in the writer, then here you can also just get all written serializers and configs with a single `readSerializersAndConfigsWithResilience`. The returned list would be length 2 (order of the key / value serializer + config will be the same as how you wrote them). ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163932401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -64,11 +70,18 @@ public int getVersion() { public void write(DataOutputView out) throws IOException { super.write(out); - out.writeShort(stateMetaInfoSnapshots.size()); - for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : stateMetaInfoSnapshots) { + out.writeShort(operatorStateMetaInfoSnapshots.size()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : operatorStateMetaInfoSnapshots) { + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateWriterForVersion(VERSION, kvState) + .writeOperatorStateMetaInfo(out); + } + + out.writeShort(broadcastStateMetaInfoSnapshots.size()); + for (RegisteredBroadcastBackendStateMetaInfo.Snapshot kvState : broadcastStateMetaInfoSnapshots) { --- End diff -- same here: the naming of the `kvState` variable here is actually a bit odd ---
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339693#comment-16339693 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163932401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -64,11 +70,18 @@ public int getVersion() { public void write(DataOutputView out) throws IOException { super.write(out); - out.writeShort(stateMetaInfoSnapshots.size()); - for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : stateMetaInfoSnapshots) { + out.writeShort(operatorStateMetaInfoSnapshots.size()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : operatorStateMetaInfoSnapshots) { + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateWriterForVersion(VERSION, kvState) + .writeOperatorStateMetaInfo(out); + } + + out.writeShort(broadcastStateMetaInfoSnapshots.size()); + for (RegisteredBroadcastBackendStateMetaInfo.Snapshot kvState : broadcastStateMetaInfoSnapshots) { --- End diff -- same here: the naming of the `kvState` variable here is actually a bit odd > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339698#comment-16339698 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163938102 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo{ + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer keySerializer, + final TypeSerializer valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + public TypeSerializer getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result +
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163942745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + int numBroadcastStates = in.readShort(); --- End diff -- I think somehow the migration test cases are not failing here, only because `in.readShort()` happens to return 0. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163932956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -35,18 +35,24 @@ public static final int VERSION = 2; --- End diff -- It seems like the `OperatorBackendSerializationProxy` will have new binary formats after this change. This should then have an uptick in the VERSION. In general, I think the PR currently does not have any migration paths for previous versions (where there is no broadcast state meta info written). ---
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339695#comment-16339695 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163933180 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + int numBroadcastStates = in.readShort(); --- End diff -- This here (and onwards) would fail if we're reading older version savepoints, because there was nothing written for this before. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339697#comment-16339697 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163935065 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + int numBroadcastStates = in.readShort(); --- End diff -- One straightforward way to fix this is, is to uptick the current `VERSION` to 3, and here you do: ``` if (getReadVersion() >= 3) { // read broadcast state stuff } ``` so we only try to read broadcast state stuff if the written version in the savepoint is larger or equal to 3. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339691#comment-16339691 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163932340 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -64,11 +70,18 @@ public int getVersion() { public void write(DataOutputView out) throws IOException { super.write(out); - out.writeShort(stateMetaInfoSnapshots.size()); - for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : stateMetaInfoSnapshots) { + out.writeShort(operatorStateMetaInfoSnapshots.size()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : operatorStateMetaInfoSnapshots) { --- End diff -- the naming of the `kvState` variable here is actually a bit odd > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163934364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) throws IOException { } } + public static class BroadcastStateMetaInfoWriterV2extends AbstractBroadcastStateMetaInfoWriter { + + public BroadcastStateMetaInfoWriterV2( + final RegisteredBroadcastBackendStateMetaInfo.Snapshot broadcastStateMetaInfo) { + super(broadcastStateMetaInfo); + } + + @Override + public void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException { + out.writeUTF(broadcastStateMetaInfo.getName()); + out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList(new Tuple2<>( + broadcastStateMetaInfo.getKeySerializer(), + broadcastStateMetaInfo.getKeySerializerConfigSnapshot(; + + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( --- End diff -- Combining these two `writeSerializersAndConfigsWithResilience` calls into one call, with a single list containing both the key serializer and value serializer, would be more space-efficient in the written data: ``` TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, Arrays.asList( Tuple2.of(keySerializer, keySerializerConfig), Tuple2.of(valueSerializer, valueSerializerConfig)); ``` ---
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339699#comment-16339699 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163934364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) throws IOException { } } + public static class BroadcastStateMetaInfoWriterV2extends AbstractBroadcastStateMetaInfoWriter { + + public BroadcastStateMetaInfoWriterV2( + final RegisteredBroadcastBackendStateMetaInfo.Snapshot broadcastStateMetaInfo) { + super(broadcastStateMetaInfo); + } + + @Override + public void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException { + out.writeUTF(broadcastStateMetaInfo.getName()); + out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList(new Tuple2<>( + broadcastStateMetaInfo.getKeySerializer(), + broadcastStateMetaInfo.getKeySerializerConfigSnapshot(; + + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( --- End diff -- Combining these two `writeSerializersAndConfigsWithResilience` calls into one call, with a single list containing both the key serializer and value serializer, would be more space-efficient in the written data: ``` TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, Arrays.asList( Tuple2.of(keySerializer, keySerializerConfig), Tuple2.of(valueSerializer, valueSerializerConfig)); ``` > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339702#comment-16339702 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163938277 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo{ + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer keySerializer, + final TypeSerializer valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer getKeySerializer() { + return keySerializer; + } + + public TypeSerializer getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result +
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339701#comment-16339701 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163942745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + int numBroadcastStates = in.readShort(); --- End diff -- I think somehow the migration test cases are not failing here, only because `in.readShort()` happens to return 0. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339690#comment-16339690 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163932956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -35,18 +35,24 @@ public static final int VERSION = 2; --- End diff -- It seems like the `OperatorBackendSerializationProxy` will have new binary formats after this change. This should then have an uptick in the VERSION. In general, I think the PR currently does not have any migration paths for previous versions (where there is no broadcast state meta info written). > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339692#comment-16339692 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163930707 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -137,7 +155,12 @@ public ExecutionConfig getExecutionConfig() { @Override public Set getRegisteredStateNames() { - return registeredStates.keySet(); + Set stateNames = new HashSet<>( --- End diff -- Might not make sense to have a new `HashSet` every time `getRegisteredStateNames` is called. OTOH, would it make sense to have a separate `getRegisteredBroadcastStateNames` on the interface? > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163930707 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -137,7 +155,12 @@ public ExecutionConfig getExecutionConfig() { @Override public Set getRegisteredStateNames() { - return registeredStates.keySet(); + Set stateNames = new HashSet<>( --- End diff -- Might not make sense to have a new `HashSet` every time `getRegisteredStateNames` is called. OTOH, would it make sense to have a separate `getRegisteredBroadcastStateNames` on the interface? ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163932340 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -64,11 +70,18 @@ public int getVersion() { public void write(DataOutputView out) throws IOException { super.write(out); - out.writeShort(stateMetaInfoSnapshots.size()); - for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : stateMetaInfoSnapshots) { + out.writeShort(operatorStateMetaInfoSnapshots.size()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState : operatorStateMetaInfoSnapshots) { --- End diff -- the naming of the `kvState` variable here is actually a bit odd ---
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339700#comment-16339700 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163936390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -36,8 +36,9 @@ * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. */ public enum Mode { - SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. - BROADCAST // The operator state partitions are broadcast to all task. + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + BROADCAST, // The operator state partitions are broadcasted to all tasks. + UNIFORM_BROADCAST // The operator states are identical, and they are broadcasted to all tasks. --- End diff -- nit: can we either keep with spaces here, or at least tab them so that the 3 comments are aligned? > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339696#comment-16339696 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163935632 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -211,4 +297,34 @@ public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) { return stateMetaInfo; } } + + public static class BroadcastStateMetaInfoReaderV2extends AbstractBroadcastStateMetaInfoReader { + + public BroadcastStateMetaInfoReaderV2(final ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredBroadcastBackendStateMetaInfo.Snapshot readBroadcastStateMetaInfo(final DataInputView in) throws IOException { + RegisteredBroadcastBackendStateMetaInfo.Snapshot stateMetaInfo = + new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(); + + stateMetaInfo.setName(in.readUTF()); + stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); + + Tuple2 keySerializerAndConfig = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0); --- End diff -- If the `writeSerializersAndConfigsWithResilience` call was a single one in the writer, then here you can also just get all written serializers and configs with a single `readSerializersAndConfigsWithResilience`. The returned list would be length 2 (order of the key / value serializer + config will be the same as how you wrote them). > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163936903 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -36,8 +36,9 @@ * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. */ public enum Mode { - SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. - BROADCAST // The operator state partitions are broadcast to all task. + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + BROADCAST, // The operator state partitions are broadcasted to all tasks. --- End diff -- Maybe naming this mode `BROADCAST` was not ideal in the first place (perhaps `UNION`, to correspond to the API name, would be better). Looking at the name / comments alone between `BROADCAST` and `UNIFORM_BROADCAST` is actually quite confusing. ---
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339694#comment-16339694 ] ASF GitHub Bot commented on FLINK-8345: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163936903 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -36,8 +36,9 @@ * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. */ public enum Mode { - SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. - BROADCAST // The operator state partitions are broadcast to all task. + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + BROADCAST, // The operator state partitions are broadcasted to all tasks. --- End diff -- Maybe naming this mode `BROADCAST` was not ideal in the first place (perhaps `UNION`, to correspond to the API name, would be better). Looking at the name / comments alone between `BROADCAST` and `UNIFORM_BROADCAST` is actually quite confusing. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chris snow updated FLINK-8513: -- Description: It would be useful if the documentation provided information on connecting to non-AWS S3 endpoints when using presto. For example: You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's {{flink-conf.yaml}}: {code:java} s3.access-key: your-access-key s3.secret-key: your-secret-key{code} If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 endpoint in Flink's {{flink-conf.yaml}}: {code:java} s3.endpoint: your-endpoint-hostname{code} Source: [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] was: It would be useful if the documentation provided information on connecting to non-AWS S3 endpoints when using presto. For example: You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's {{flink-conf.yaml}}: {{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++ If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 endpoint in Flink's {{flink-conf.yaml}}: {{s3.endpoint: your-endpoint-hostname }} Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: chris snow >Priority: Trivial > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chris snow updated FLINK-8513: -- Description: It would be useful if the documentation provided information on connecting to non-AWS S3 endpoints when using presto. For example: You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's {{flink-conf.yaml}}: {{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++ If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 endpoint in Flink's {{flink-conf.yaml}}: {{s3.endpoint: your-endpoint-hostname }} Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md was:It would be useful if the documentation provided information on connecting to non-AWS S3 endpoints when using presto. > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: chris snow >Priority: Trivial > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {{s3.access-key: your-access-key s3.secret-key: your-secret-key }}++ > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {{s3.endpoint: your-endpoint-hostname }} > > > Source: https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
chris snow created FLINK-8513: - Summary: Add documentation for connecting to non-AWS S3 endpoints Key: FLINK-8513 URL: https://issues.apache.org/jira/browse/FLINK-8513 Project: Flink Issue Type: Improvement Components: Documentation Reporter: chris snow It would be useful if the documentation provided information on connecting to non-AWS S3 endpoints when using presto. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8506) fullRestarts Gauge not incremented when jobmanager got killed
[ https://issues.apache.org/jira/browse/FLINK-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339560#comment-16339560 ] Steven Zhen Wu edited comment on FLINK-8506 at 1/25/18 6:45 PM: Till, thanks for the explanation. Looks like we should clarify the doc, which says "since job submitted". [https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html] fullRestarts The total number of full restarts since this job was submitted (in milliseconds). Gauge So it seems that we don't have any metric to capture jobmanager failover. was (Author: stevenz3wu): Till, thanks for the explanation. Looks like we should clarify the doc, which says "since job submitted". [https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html] fullRestarts The total number of full restarts since this job was submitted (in milliseconds). Gauge So it seems that we don't any metric to capture jobmanager failover. > fullRestarts Gauge not incremented when jobmanager got killed > - > > Key: FLINK-8506 > URL: https://issues.apache.org/jira/browse/FLINK-8506 > Project: Flink > Issue Type: Bug >Reporter: Steven Zhen Wu >Priority: Major > > [~till.rohrmann] When jobmanager node got killed, it will cause job restart. > But in this case, we didn't see _fullRestarts_ guage got incremented. is this > expected or a bug? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 Is it acceptable behavior that sometimes Graphs don't get deleted from disk? ---
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339594#comment-16339594 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 Is it acceptable behavior that sometimes Graphs don't get deleted from disk? > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5311: [FLINK-8454] [flip6] Remove JobExecutionResultCach...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5311#discussion_r163923585 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -100,8 +97,6 @@ private final LeaderElectionService leaderElectionService; - private final JobExecutionResultCache jobExecutionResultCache = new JobExecutionResultCache(); --- End diff -- `JobExecutionResultCache` and `JobExecutionResultCacheTest` should be also removed in this commit ---
[jira] [Commented] (FLINK-8454) Remove JobExecutionResultCache
[ https://issues.apache.org/jira/browse/FLINK-8454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339591#comment-16339591 ] ASF GitHub Bot commented on FLINK-8454: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5311#discussion_r163923585 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -100,8 +97,6 @@ private final LeaderElectionService leaderElectionService; - private final JobExecutionResultCache jobExecutionResultCache = new JobExecutionResultCache(); --- End diff -- `JobExecutionResultCache` and `JobExecutionResultCacheTest` should be also removed in this commit > Remove JobExecutionResultCache > -- > > Key: FLINK-8454 > URL: https://issues.apache.org/jira/browse/FLINK-8454 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > With the introduction of the {{SerializableExecutionGraphStore}} to the > {{Dispatcher}}, it is no longer necessary to store the {{JobResult}} in the > {{Dispatcher}}, because all information necessary to derive the {{JobResult}} > is contained in the {{SerializableExecutionGraphStore}}. In order to decrease > complexity, I propose to remove the {{JobExecutionResultCache}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339570#comment-16339570 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163918048 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); --- End diff -- It looks like a constant, i.e., it shouldn't be mutable. ``` private static final List GLOBALLY_TERMINAL_JOB_STATUS = Collections.unmodifiableList( Arrays.stream(JobStatus.values()) .filter(JobStatus::isGloballyTerminalState) .collect(Collectors.toList())); ``` Using `@BeforeClass` is not idiomatic imo. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919241 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); --- End diff -- With `ThreadLocalRandom.current().nextInt(...)` you already have an available random instance which does not suffer from lock contention problems. ---
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339572#comment-16339572 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919241 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); --- End diff -- With `ThreadLocalRandom.current().nextInt(...)` you already have an available random instance which does not suffer from lock contention problems. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163918048 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); --- End diff -- It looks like a constant, i.e., it shouldn't be mutable. ``` private static final List GLOBALLY_TERMINAL_JOB_STATUS = Collections.unmodifiableList( Arrays.stream(JobStatus.values()) .filter(JobStatus::isGloballyTerminalState) .collect(Collectors.toList())); ``` Using `@BeforeClass` is not idiomatic imo. ---
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339573#comment-16339573 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163922121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339569#comment-16339569 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919921 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339571#comment-16339571 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163915273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final CachejobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( +
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339567#comment-16339567 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163921620 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueuescheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + final ScheduledTask scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** +* Triggers all registered tasks. +*/ + public void triggerScheduledTasks() { + final Iterator iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask implements ScheduledFuture { + + private final Callable callable; + + private final boolean isPeriodic; + + private final CompletableFuture result; + + private ScheduledTask(Callable callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public boolean isPeriodic() { --- End diff -- nit: method is unused > Add
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163915273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final CachejobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists() && storageDirectory.isDirectory(), + "The storage directory must exist and be a directory."); + this.jobDetailsCache = CacheBuilder.newBuilder() +
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163911570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), ""); --- End diff -- The `errorMessage` is an empty string. Leave it out completely or put something meaningful. ---
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339568#comment-16339568 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163911570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), ""); --- End diff -- The `errorMessage` is an empty string. Leave it out completely or put something meaningful. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163921620 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueuescheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + final ScheduledTask scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** +* Triggers all registered tasks. +*/ + public void triggerScheduledTasks() { + final Iterator iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask implements ScheduledFuture { + + private final Callable callable; + + private final boolean isPeriodic; + + private final CompletableFuture result; + + private ScheduledTask(Callable callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public boolean isPeriodic() { --- End diff -- nit: method is unused ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163922121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); + } + } + + /** +* Tests that null is returned if we request an unknown JobID. +*/ + @Test + public void testUnknownGet() throws IOException {
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919921 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); --- End diff -- It is not obvious what the matcher is doing. How about: `assertThat(...), isPredicateFulfilled(..))` ``` private static Matcher
[jira] [Commented] (FLINK-8506) fullRestarts Gauge not incremented when jobmanager got killed
[ https://issues.apache.org/jira/browse/FLINK-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339560#comment-16339560 ] Steven Zhen Wu commented on FLINK-8506: --- Till, thanks for the explanation. Looks like we should clarify the doc, which says "since job submitted". [https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html] fullRestarts The total number of full restarts since this job was submitted (in milliseconds). Gauge So it seems that we don't any metric to capture jobmanager failover. > fullRestarts Gauge not incremented when jobmanager got killed > - > > Key: FLINK-8506 > URL: https://issues.apache.org/jira/browse/FLINK-8506 > Project: Flink > Issue Type: Bug >Reporter: Steven Zhen Wu >Priority: Major > > [~till.rohrmann] When jobmanager node got killed, it will cause job restart. > But in this case, we didn't see _fullRestarts_ guage got incremented. is this > expected or a bug? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5364: Flink 8472 1.4
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5364 Flink 8472 1.4 ## What is the purpose of the change Extend all migration tests, to include verifying restore from Flink 1.4 savepoints. This includes extending: - `WindowOperatorMigrationTest` - `CEPMigrationTest` - `StatefulJobSavepointMigrationTestITCase` (Scala API migration) - `StatefulJobSavepointMigrationTestITCase` (Java API migration) - `FlinkKinesisConsumerMigrationTest` - `FlinkKafkaConsumerBaseMigrationTest` - `ContinuousFileProcessingMigrationTest` - `BucketingSinkMigrationTest` This PR should also be forward-ported to the `master` branch to cover Flink 1.5. ## Brief change log All commits except 1ce3e6c are simply adding `MigrationVersion.v1_4` to the test parameters and adding the corresponding test savepoint files. 1ce3e6c is a refactor of `StatefulJobSavepointMigrationFrom12ITCase` and `StatefulJobSavepointMigrationFrom13ITCase` to a single `StatefulJobSavepointMigrationITCase` class. ## Verifying this change This is a test refactor / extension, so all existing tests should cover the changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8472-1.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5364.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5364 commit 28a1c4d403cc978d60af765f1e67bfb4bedd93c4 Author: Tzu-Li (Gordon) TaiDate: 2018-01-25T15:08:53Z [FLINK-8472] [DataStream, test] Extend WindowOperatorMigrationTest for Flink 1.4 commit 803c641b6c4e03431a960826f5a531423db3d5ef Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T15:13:14Z [FLINK-8472] [cep, test] Extend CEPMigrationTest for Flink 1.4 commit 6f6505232c7294a0c30e7cef9fbf2f9218e478b1 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T15:20:31Z [FLINK-8472] [scala, test] Extend StatefulJobSavepointMigrationITCase for Flink 1.4 commit 1ce3e6cd331d8791d28474d9838070005d4e37a3 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T17:25:45Z [FLINK-8472] [DataStream, test] Refactor StatefulJobSavepointFrom*MigrationITCase to single ITCase This commit refactors the StatefulJobSavepointFrom12MigrationITCase and StatefulJobSavepointFrom13MigrationITCase to a single class, StatefulJobSavepointMigrationITCase. The new ITCase is parameterized to ensure that all previous versions and state backend variants are covered. commit 696b5b3f3927f63c1c0e7e550db14427dbe7a0cb Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T17:37:05Z [FLINK-8472] [DataStream, test] Extend StatefulJobSavepointMigrationITCase for Flink 1.4 commit 1d308604bf832c856b8b6f3d1a33d189ddab80e8 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T15:23:17Z [FLINK-8472] [kinesis, test] Extend FlinkKinesisConsumerMigrationTest for Flink 1.4 commit 2de2943ffed3f1ab8928e3d9be96c5c6d7dd4f96 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T16:23:22Z [FLINK-8472] [kafka, test] Extend FlinkKafkaConsumerBaseMigrationTest for Flink 1.4 commit bfee6ce9f842e7e2785ffb664001b42e2f7e5a44 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T16:28:17Z [FLINK-8472] [fs, test] Extend ContinuousFileProcessingMigrationTest for Flink 1.4 commit 141236219b0afd4093ae891398564f39539875e0 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T16:32:28Z [FLINK-8472] [fs, test] Extend BucketingSinkMigrationTest for Flink 1.4 commit ceb00c7b210dca4f4c2e2bd5f2f399d5f0880934 Author: Tzu-Li (Gordon) Tai Date: 2018-01-25T16:45:47Z [hotfix] [test] Remove stale savepoint files no longer used by migration tests This includes: - Removing MigrationVersion.v1_1, since compatilbity for 1.1 is no longer supported
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339532#comment-16339532 ] ASF GitHub Bot commented on FLINK-8240: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5240 Thanks for the feedback @fhueske. I hope I could address most of it. I think we should merge this PR (if you agree) and add more PRs for this issue as the next steps. I suggest the following subtasks: - Add validation for the CSV format - Add full CsvTableSourceFactory support (incl. proctime, rowtime, and schema mapping) - Add a JSON schema parser to the JSON and logic for creating a table source from it - Add validation for the JSON format - Add validation for the Rowtime descriptor - Add validation for StreamTableDescriptor - Add validation for BatchTableDescriptor - Add KafkaTableSource factories What do you think? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5240 Thanks for the feedback @fhueske. I hope I could address most of it. I think we should merge this PR (if you agree) and add more PRs for this issue as the next steps. I suggest the following subtasks: - Add validation for the CSV format - Add full CsvTableSourceFactory support (incl. proctime, rowtime, and schema mapping) - Add a JSON schema parser to the JSON and logic for creating a table source from it - Add validation for the JSON format - Add validation for the Rowtime descriptor - Add validation for StreamTableDescriptor - Add validation for BatchTableDescriptor - Add KafkaTableSource factories What do you think? ---
[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe
[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339509#comment-16339509 ] ASF GitHub Bot commented on FLINK-8450: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5309 Thanks for the review @GJL. I've addressed your comments in cf9d24d and rebased onto the latest master. > Make JobMaster/DispatcherGateway#requestJob type safe > - > > Key: FLINK-8450 > URL: https://issues.apache.org/jira/browse/FLINK-8450 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{RestfulGateway#requestJob}} returns a > {{CompletableFuture}}. Since {{AccessExecutionGraph}} > is non serializable it could fail if we execute this RPC from a remote > system. In order to make it typesafe we should change its signature to > {{SerializableExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#req...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5309 Thanks for the review @GJL. I've addressed your comments in cf9d24d and rebased onto the latest master. ---
[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe
[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339507#comment-16339507 ] ASF GitHub Bot commented on FLINK-8450: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5309#discussion_r163905384 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -228,10 +246,10 @@ public void testConcurrentAccess() throws Exception { Collection allExecutionGraphs = allExecutionGraphFutures.get(); --- End diff -- True, thanks for the info. Didn't know `ExecutorCompletionService` before. > Make JobMaster/DispatcherGateway#requestJob type safe > - > > Key: FLINK-8450 > URL: https://issues.apache.org/jira/browse/FLINK-8450 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{RestfulGateway#requestJob}} returns a > {{CompletableFuture}}. Since {{AccessExecutionGraph}} > is non serializable it could fail if we execute this RPC from a remote > system. In order to make it typesafe we should change its signature to > {{SerializableExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5309#discussion_r163905384 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -228,10 +246,10 @@ public void testConcurrentAccess() throws Exception { Collection allExecutionGraphs = allExecutionGraphFutures.get(); --- End diff -- True, thanks for the info. Didn't know `ExecutorCompletionService` before. ---
[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe
[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339504#comment-16339504 ] ASF GitHub Bot commented on FLINK-8450: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5309#discussion_r163904466 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -199,12 +212,17 @@ public void testCacheEntryCleanup() throws Exception { public void testConcurrentAccess() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final AtomicInteger requestJobCalls = new AtomicInteger(0); --- End diff -- True, will change it. > Make JobMaster/DispatcherGateway#requestJob type safe > - > > Key: FLINK-8450 > URL: https://issues.apache.org/jira/browse/FLINK-8450 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{RestfulGateway#requestJob}} returns a > {{CompletableFuture}}. Since {{AccessExecutionGraph}} > is non serializable it could fail if we execute this RPC from a remote > system. In order to make it typesafe we should change its signature to > {{SerializableExecutionGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5309#discussion_r163904466 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -199,12 +212,17 @@ public void testCacheEntryCleanup() throws Exception { public void testConcurrentAccess() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final AtomicInteger requestJobCalls = new AtomicInteger(0); --- End diff -- True, will change it. ---
[jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe
[ https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339499#comment-16339499 ] ASF GitHub Bot commented on FLINK-8450: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5309#discussion_r163902431 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java --- @@ -30,62 +30,69 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Tests for the {@link ExecutionGraphCache}. */ public class ExecutionGraphCacheTest extends TestLogger { + private static ArchivedExecutionGraph expectedExecutionGraph; + private static final JobID expectedJobId = new JobID(); + + @BeforeClass + public static void setup() { + expectedExecutionGraph = new ArchivedExecutionGraphBuilder().build(); + } + /** * Tests that we can cache AccessExecutionGraphs over multiple accesses. */ @Test public void testExecutionGraphCaching() throws Exception { final Time timeout = Time.milliseconds(100L); final Time timeToLive = Time.hours(1L); - final JobID jobId = new JobID(); - final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); - final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); - when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + final CountingRestfulGateway restfulGateway = createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph)); try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { - CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get()); - CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); - assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + assertEquals(expectedExecutionGraph, accessExecutionGraphFuture2.get()); --- End diff -- True, will remove it. > Make JobMaster/DispatcherGateway#requestJob type safe > - > > Key: FLINK-8450 > URL: