[jira] [Commented] (FLINK-31135) ConfigMap DataSize went > 1 MB and cluster stopped working
[ https://issues.apache.org/jira/browse/FLINK-31135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698330#comment-17698330 ] ramkrishna.s.vasudevan commented on FLINK-31135: So are we adding all the checkpoints data back to the CR? > ConfigMap DataSize went > 1 MB and cluster stopped working > -- > > Key: FLINK-31135 > URL: https://issues.apache.org/jira/browse/FLINK-31135 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.2.0 >Reporter: Sriram Ganesh >Priority: Major > > I am Flink Operator to manage clusters. Flink version: 1.15.2. Flink jobs > failed with the below error. It seems the config map size went beyond 1 MB > (default size). > Since it is managed by the operator and config maps are not updated with any > manual intervention, I suspect it could be an operator issue. > > {code:java} > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure > executing: PUT at: > https:///api/v1/namespaces//configmaps/-config-map. Message: > ConfigMap "-config-map" is invalid: []: Too long: must have at most > 1048576 bytes. Received status: Status(apiVersion=v1, code=422, > details=StatusDetails(causes=[StatusCause(field=[], message=Too long: must > have at most 1048576 bytes, reason=FieldValueTooLong, > additionalProperties={})], group=null, kind=ConfigMap, name=-config-map, > retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, > message=ConfigMap "-config-map" is invalid: []: Too long: must have at > most 1048576 bytes, metadata=ListMeta(_continue=null, > remainingItemCount=null, resourceVersion=null, selfLink=null, > additionalProperties={}), reason=Invalid, status=Failure, > additionalProperties={}). > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:521) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:347) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:327) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:781) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:183) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:188) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:130) > ~[flink-dist-1.15.2.jar:1.15.2] > at > io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:41) > ~[flink-dist-1.15.2.jar:1.15.2] > at > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:325) > ~[flink-dist-1.15.2.jar:1.15.2] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > ~[?:?] > ... 3 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31376) CSVReader for streaming does not support splittable
ramkrishna.s.vasudevan created FLINK-31376: -- Summary: CSVReader for streaming does not support splittable Key: FLINK-31376 URL: https://issues.apache.org/jira/browse/FLINK-31376 Project: Flink Issue Type: Improvement Reporter: ramkrishna.s.vasudevan Using CsvReaderFormat, when we create the StreamFormatAdapter it will not support 'splittable'. This task is targetted towards supporting file splits while we create the FileSource over a CSV file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30127) Correct the packaging of HadoopRecoverableWriter and related issues
[ https://issues.apache.org/jira/browse/FLINK-30127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan closed FLINK-30127. -- Release Note: Fixed as part of https://issues.apache.org/jira/browse/FLINK-30128 Resolution: Fixed > Correct the packaging of HadoopRecoverableWriter and related issues > --- > > Key: FLINK-30127 > URL: https://issues.apache.org/jira/browse/FLINK-30127 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > > The first issue here is that the HadoopRecoverableWriter that creates the > RecoverableWriter classes are not found in the azure-fs package. We need to > fix them so that we can use the hadoop-common libraries to create the ABFS > wrappers for the Streaming sink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30588) Improvement in truncate code in the new ABFSOutputstream
[ https://issues.apache.org/jira/browse/FLINK-30588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-30588: --- Description: There are few TODOs in the new ABFSOutputstream. 1) Currently the buffer to read and do the truncate operation from the source file is 2MB. This we can allow some configuration based on the future usage and perf results. 2) Currently while writing to temp file we write the entire data and then finally close/flush the data. Based on usages we can call intermediate flush/sync on the files. > Improvement in truncate code in the new ABFSOutputstream > - > > Key: FLINK-30588 > URL: https://issues.apache.org/jira/browse/FLINK-30588 > Project: Flink > Issue Type: Sub-task >Reporter: ramkrishna.s.vasudevan >Priority: Major > > There are few TODOs in the new ABFSOutputstream. > 1) Currently the buffer to read and do the truncate operation from the source > file is 2MB. This we can allow some configuration based on the future usage > and perf results. > 2) Currently while writing to temp file we write the entire data and then > finally close/flush the data. Based on usages we can call intermediate > flush/sync on the files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30588) Improvement in truncate code in the new ABFSOutputstream
ramkrishna.s.vasudevan created FLINK-30588: -- Summary: Improvement in truncate code in the new ABFSOutputstream Key: FLINK-30588 URL: https://issues.apache.org/jira/browse/FLINK-30588 Project: Flink Issue Type: Sub-task Reporter: ramkrishna.s.vasudevan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18568) Add Support for Azure Data Lake Store Gen 2 in File Sink
[ https://issues.apache.org/jira/browse/FLINK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648354#comment-17648354 ] ramkrishna.s.vasudevan commented on FLINK-18568: [~psrinivasulu] - can you take a look at this PR, when you get some time. > Add Support for Azure Data Lake Store Gen 2 in File Sink > > > Key: FLINK-18568 > URL: https://issues.apache.org/jira/browse/FLINK-18568 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Connectors / Common >Affects Versions: 1.12.0 >Reporter: Israel Ekpo >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: auto-deprioritized-major > > The objective of this improvement is to add support for Azure Data Lake Store > Gen 2 (ADLS Gen2) [2] as one of the supported filesystems for the FileSink [1] > [1] > https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/filesystem/#file-sink > [2] https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-30128: --- Attachment: (was: Flink_ABFS_support_1.pdf) > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643653#comment-17643653 ] ramkrishna.s.vasudevan edited comment on FLINK-30128 at 12/6/22 4:36 AM: - Trying to add tests for Azure fs, seems some of the IT tests are already not running in the CI AzureFileSystemBehaviorITCase. The others that run are AzureBlobStorageFSFactoryTest and AzureDataLakeStoreGen2FSFactoryTest. Any idea on how we should be adding those tests here? For reference {code} Dec 05 17:06:36 [INFO] Running org.apache.flink.fs.azurefs.AzureFileSystemBehaviorITCase Dec 05 17:06:37 [INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.792 s - in org.apache.flink.fs.azurefs.AzureFileSystemBehaviorITCase Dec 05 17:06:37 [INFO] Dec 05 17:06:37 [INFO] Results: Dec 05 17:06:37 [INFO] Dec 05 17:06:37 [INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0 Dec 05 17:06:37 [INFO] Dec 05 17:06:37 [INFO] Dec 05 17:06:37 [INFO] --- japicmp-maven-plugin:0.17.1.1_m325:cmp (default) @ flink-azure-fs-hadoop --- Dec 05 17:06:37 [INFO] Skipping execution because parameter 'skip' was set to true. {code} was (Author: ram_krish): Trying to add tests for Azure fs, seems some of the IT tests are already not running in the CI AzureFileSystemBehaviorITCase. The others that run are AzureBlobStorageFSFactoryTest and AzureDataLakeStoreGen2FSFactoryTest. Any idea on how we should be adding those tests here? > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > Attachments: Flink_ABFS_support_1.pdf > > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643653#comment-17643653 ] ramkrishna.s.vasudevan commented on FLINK-30128: Trying to add tests for Azure fs, seems some of the IT tests are already not running in the CI AzureFileSystemBehaviorITCase. The others that run are AzureBlobStorageFSFactoryTest and AzureDataLakeStoreGen2FSFactoryTest. Any idea on how we should be adding those tests here? > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > Attachments: Flink_ABFS_support_1.pdf > > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-30128: --- Attachment: Flink_ABFS_support_1.pdf > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > Attachments: Flink_ABFS_support_1.pdf > > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-30128: --- Attachment: (was: Flink_ABFS_support.pdf) > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642287#comment-17642287 ] ramkrishna.s.vasudevan commented on FLINK-30128: Attaching a simple diagram that talks about how the class diagram will look like. Basically HadoopFileSystem will be extended to create AzureBlobFileSystem. Internally it would create the AzureBlobRecoverableWriter which will work with AzureBlobFsRecoverableDataOutputStream. Will raise a PR for this after some more testing. > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > Attachments: Flink_ABFS_support.pdf > > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
[ https://issues.apache.org/jira/browse/FLINK-30128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-30128: --- Attachment: Flink_ABFS_support.pdf > Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path > -- > > Key: FLINK-30128 > URL: https://issues.apache.org/jira/browse/FLINK-30128 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.13.1 >Reporter: ramkrishna.s.vasudevan >Priority: Major > Attachments: Flink_ABFS_support.pdf > > > Currently the HadoopRecoverableWriter assumes that the underlying FS is > Hadoop and so it checks for DistributedFileSystem. It also tries to do a > truncate and ensure the lease is recovered before the 'rename' operation is > done. > In the Azure Data lake gen 2 world, the driver does not support truncate and > lease recovery API. We should be able to get the last committed size and if > it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30128) Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
ramkrishna.s.vasudevan created FLINK-30128: -- Summary: Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path Key: FLINK-30128 URL: https://issues.apache.org/jira/browse/FLINK-30128 Project: Flink Issue Type: Sub-task Affects Versions: 1.13.1 Reporter: ramkrishna.s.vasudevan Currently the HadoopRecoverableWriter assumes that the underlying FS is Hadoop and so it checks for DistributedFileSystem. It also tries to do a truncate and ensure the lease is recovered before the 'rename' operation is done. In the Azure Data lake gen 2 world, the driver does not support truncate and lease recovery API. We should be able to get the last committed size and if it matches go for the rename. Will be back with more details here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30127) Correct the packaging of HadoopRecoverableWriter and related issues
ramkrishna.s.vasudevan created FLINK-30127: -- Summary: Correct the packaging of HadoopRecoverableWriter and related issues Key: FLINK-30127 URL: https://issues.apache.org/jira/browse/FLINK-30127 Project: Flink Issue Type: Sub-task Affects Versions: 1.13.1 Reporter: ramkrishna.s.vasudevan The first issue here is that the HadoopRecoverableWriter that creates the RecoverableWriter classes are not found in the azure-fs package. We need to fix them so that we can use the hadoop-common libraries to create the ABFS wrappers for the Streaming sink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18568) Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink
[ https://issues.apache.org/jira/browse/FLINK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636992#comment-17636992 ] ramkrishna.s.vasudevan commented on FLINK-18568: [~jinq0123] Yes. I am working on it. Pls allow this JIRa to be in my name . We are working towards this and share PRs. Before that I plan to raise some subtasks under this. > Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink > -- > > Key: FLINK-18568 > URL: https://issues.apache.org/jira/browse/FLINK-18568 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Connectors / Common >Affects Versions: 1.12.0 >Reporter: Israel Ekpo >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.17.0 > > > The objective of this improvement is to add support for Azure Data Lake Store > Gen 2 (ADLS Gen2) [2] as one of the supported filesystems for the Streaming > File Sink [1] > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html > [2] https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-18568) Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink
[ https://issues.apache.org/jira/browse/FLINK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-18568: --- Labels: auto-deprioritized-major (was: auto-deprioritized-major stale-assigned) > Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink > -- > > Key: FLINK-18568 > URL: https://issues.apache.org/jira/browse/FLINK-18568 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Connectors / Common >Affects Versions: 1.12.0 >Reporter: Israel Ekpo >Assignee: Srinivasulu Punuru >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.17.0 > > > The objective of this improvement is to add support for Azure Data Lake Store > Gen 2 (ADLS Gen2) [2] as one of the supported filesystems for the Streaming > File Sink [1] > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html > [2] https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18568) Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink
[ https://issues.apache.org/jira/browse/FLINK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626521#comment-17626521 ] ramkrishna.s.vasudevan commented on FLINK-18568: [~psrinivasulu] - I would like to take a stab at this. Is it ok if I can assign this to myself? > Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink > -- > > Key: FLINK-18568 > URL: https://issues.apache.org/jira/browse/FLINK-18568 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Connectors / Common >Affects Versions: 1.12.0 >Reporter: Israel Ekpo >Assignee: Srinivasulu Punuru >Priority: Minor > Labels: auto-deprioritized-major, stale-assigned > Fix For: 1.17.0 > > > The objective of this improvement is to add support for Azure Data Lake Store > Gen 2 (ADLS Gen2) [2] as one of the supported filesystems for the Streaming > File Sink [1] > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html > [2] https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-14295) Nightly flink-runtime failed with java 11
[ https://issues.apache.org/jira/browse/FLINK-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16944456#comment-16944456 ] ramkrishna.s.vasudevan commented on FLINK-14295: I see. Thanks. Let me see if I get any failure. > Nightly flink-runtime failed with java 11 > - > > Key: FLINK-14295 > URL: https://issues.apache.org/jira/browse/FLINK-14295 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Yu Li >Priority: Critical > Labels: test-stability > > The core-jdk11 part of nightly test failed with below error: > {noformat} > 22:09:38.176 [ERROR] Failures: > 22:09:38.180 [ERROR] > TaskExecutorSubmissionTest.testRequestStackTraceSample:637 > expected:<[updateTaskExecutionState]> but > was:<[lambda$updateTaskExecutionState$0]> > 22:09:38.185 [ERROR] Errors: > 22:09:38.185 [ERROR] > RecordWriterTest.testClearBuffersAfterInterruptDuringBlockingBufferRequest:165 > » NoSuchElement > 22:09:38.185 [INFO] > 22:09:38.186 [ERROR] Tests run: 3936, Failures: 1, Errors: 1, Skipped: 40 > {noformat} > Link of the build: https://api.travis-ci.org/v3/job/591086968/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14295) Nightly flink-runtime failed with java 11
[ https://issues.apache.org/jira/browse/FLINK-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16944370#comment-16944370 ] ramkrishna.s.vasudevan commented on FLINK-14295: [~chesnay] -thanks for the comment. I am running with jdk 11 (open jdk) but am not seeing the test failure. Enabled -X while running the test but seems this failure does not happen. {code} [DEBUG] Forking command line: /bin/sh -c cd /home/flink/flink-runtime && /home/jdk-11.0.4+11/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseG1GC -jar /home/flink/flink-runtime/target/surefire/surefirebooter4621356499252557184.jar /home/flink/flink-runtime/target/surefire 2019-10-04T17-53-15_850-jvmRun1 surefire18278570483194581237tmp surefire_0623991537379306572tmp [INFO] Running org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest {code} You can see that the jdk here is 11. > Nightly flink-runtime failed with java 11 > - > > Key: FLINK-14295 > URL: https://issues.apache.org/jira/browse/FLINK-14295 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Yu Li >Priority: Critical > Labels: test-stability > > The core-jdk11 part of nightly test failed with below error: > {noformat} > 22:09:38.176 [ERROR] Failures: > 22:09:38.180 [ERROR] > TaskExecutorSubmissionTest.testRequestStackTraceSample:637 > expected:<[updateTaskExecutionState]> but > was:<[lambda$updateTaskExecutionState$0]> > 22:09:38.185 [ERROR] Errors: > 22:09:38.185 [ERROR] > RecordWriterTest.testClearBuffersAfterInterruptDuringBlockingBufferRequest:165 > » NoSuchElement > 22:09:38.185 [INFO] > 22:09:38.186 [ERROR] Tests run: 3936, Failures: 1, Errors: 1, Skipped: 40 > {noformat} > Link of the build: https://api.travis-ci.org/v3/job/591086968/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14295) Nightly flink-runtime failed with java 11
[ https://issues.apache.org/jira/browse/FLINK-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16943315#comment-16943315 ] ramkrishna.s.vasudevan commented on FLINK-14295: [~chesnay], [~liyu] I can help to solve this ticket. Just one query - while running the test with java11 - is there any profile that I need to pass to the 'mvn test' command so that the test runs with java 11 (assuming I already have the jdk 11 in my test node). > Nightly flink-runtime failed with java 11 > - > > Key: FLINK-14295 > URL: https://issues.apache.org/jira/browse/FLINK-14295 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Yu Li >Priority: Critical > Labels: test-stability > > The core-jdk11 part of nightly test failed with below error: > {noformat} > 22:09:38.176 [ERROR] Failures: > 22:09:38.180 [ERROR] > TaskExecutorSubmissionTest.testRequestStackTraceSample:637 > expected:<[updateTaskExecutionState]> but > was:<[lambda$updateTaskExecutionState$0]> > 22:09:38.185 [ERROR] Errors: > 22:09:38.185 [ERROR] > RecordWriterTest.testClearBuffersAfterInterruptDuringBlockingBufferRequest:165 > » NoSuchElement > 22:09:38.185 [INFO] > 22:09:38.186 [ERROR] Tests run: 3936, Failures: 1, Errors: 1, Skipped: 40 > {noformat} > Link of the build: https://api.travis-ci.org/v3/job/591086968/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-4808) Allow skipping failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202221#comment-16202221 ] ramkrishna.s.vasudevan commented on FLINK-4808: --- I can try working on it next week? I raised a PR for FLINK-4810 so I will try to refresh that against the latest code? Is that fine? If the issue is very critical and can't wait till next week then am fine if some one wants to fix it > Allow skipping failed checkpoints > - > > Key: FLINK-4808 > URL: https://issues.apache.org/jira/browse/FLINK-4808 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.1.2, 1.1.3 >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, if Flink cannot complete a checkpoint, it results in a failure and > recovery. > To make the impact of less stable storage infrastructure on the performance > of Flink less severe, Flink should be able to tolerate a certain number of > failed checkpoints and simply keep executing. > This should be controllable via a parameter, for example: > {code} > env.getCheckpointConfig().setAllowedFailedCheckpoints(3); > {code} > A value of {{-1}} could indicate an infinite number of checkpoint failures > tolerated by Flink. > The default value should still be {{0}}, to keep compatibility with the > existing behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4808) Allow skipping failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202223#comment-16202223 ] ramkrishna.s.vasudevan commented on FLINK-4808: --- Thanks once again. > Allow skipping failed checkpoints > - > > Key: FLINK-4808 > URL: https://issues.apache.org/jira/browse/FLINK-4808 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.1.2, 1.1.3 >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > > Currently, if Flink cannot complete a checkpoint, it results in a failure and > recovery. > To make the impact of less stable storage infrastructure on the performance > of Flink less severe, Flink should be able to tolerate a certain number of > failed checkpoints and simply keep executing. > This should be controllable via a parameter, for example: > {code} > env.getCheckpointConfig().setAllowedFailedCheckpoints(3); > {code} > A value of {{-1}} could indicate an infinite number of checkpoint failures > tolerated by Flink. > The default value should still be {{0}}, to keep compatibility with the > existing behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6284) Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007657#comment-16007657 ] ramkrishna.s.vasudevan commented on FLINK-6284: --- I can try to do this by end of today IST. > Incorrect sorting of completed checkpoints in > ZooKeeperCompletedCheckpointStore > --- > > Key: FLINK-6284 > URL: https://issues.apache.org/jira/browse/FLINK-6284 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Priority: Blocker > Fix For: 1.3.0 > > > Now all completed checkpoints are sorted in their paths when they are > recovered in {{ZooKeeperCompletedCheckpointStore}} . In the cases where the > latest checkpoint's id is not the largest in lexical order (e.g., "100" is > smaller than "99" in lexical order), Flink will not recover from the latest > completed checkpoint. > The problem can be easily observed by setting the checkpoint ids in > {{ZooKeeperCompletedCheckpointStoreITCase#testRecover()}} to be 99, 100 and > 101. > To fix the problem, we should explicitly sort found checkpoints in their > checkpoint ids, without the usage of > {{ZooKeeperStateHandleStore#getAllSortedByName()}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5960) Make CheckpointCoordinator less blocking
[ https://issues.apache.org/jira/browse/FLINK-5960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004100#comment-16004100 ] ramkrishna.s.vasudevan commented on FLINK-5960: --- I can work on this too, if no one else is already working on this. > Make CheckpointCoordinator less blocking > > > Key: FLINK-5960 > URL: https://issues.apache.org/jira/browse/FLINK-5960 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann > > Currently the {{CheckpointCoordinator}} locks its operation under a global > lock. This also includes writing checkpoint data out to a state storage. If > this operation blocks, then the whole checkpoint operator stands still. I > think we should rework the {{CheckpointCoordinator}} to make fewer > assumptions about external systems to tolerate write failures and timeouts. > Furthermore, we should try to limit the scope of locking and the execution of > potentially blocking operation under the lock. This will improve the runtime > behaviour of the {{CheckpointCoordinator}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6284) Incorrect sorting of completed checkpoints in ZooKeeperCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004090#comment-16004090 ] ramkrishna.s.vasudevan commented on FLINK-6284: --- Can I take this up, if some one is not already working on this? > Incorrect sorting of completed checkpoints in > ZooKeeperCompletedCheckpointStore > --- > > Key: FLINK-6284 > URL: https://issues.apache.org/jira/browse/FLINK-6284 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Priority: Blocker > > Now all completed checkpoints are sorted in their paths when they are > recovered in {{ZooKeeperCompletedCheckpointStore}} . In the cases where the > latest checkpoint's id is not the largest in lexical order (e.g., "100" is > smaller than "99" in lexical order), Flink will not recover from the latest > completed checkpoint. > The problem can be easily observed by setting the checkpoint ids in > {{ZooKeeperCompletedCheckpointStoreITCase#testRecover()}} to be 99, 100 and > 101. > To fix the problem, we should explicitly sort found checkpoints in their > checkpoint ids, without the usage of > {{ZooKeeperStateHandleStore#getAllSortedByName()}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004086#comment-16004086 ] ramkrishna.s.vasudevan commented on FLINK-4810: --- [~StephanEwen] Can I rebase this PR with the current code? Am not sure on the current status of CheckPointcoordinator. Has this already been taken care of? > Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful > checkpoints > > > Key: FLINK-4810 > URL: https://issues.apache.org/jira/browse/FLINK-4810 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen > > The Checkpoint coordinator should track the number of consecutive > unsuccessful checkpoints. > If more than {{n}} (configured value) checkpoints fail in a row, it should > call {{fail()}} on the execution graph to trigger a recovery. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970592#comment-15970592 ] ramkrishna.s.vasudevan commented on FLINK-5752: --- I completed the patch related work but I don have my linux box to do some tests. Once I get them I will submit a PR with test cases. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > Once FLINK-5698 is implemented then we should be making use of the feature to > push down the projections for a nested table. So in case of HBase if we have > {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query > that needs to select f2.c - then we should be specifically able to project > only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such > projections and HBaseTableSource should make use of that API to do the > projection. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956498#comment-15956498 ] ramkrishna.s.vasudevan commented on FLINK-5752: --- Started working on this. Will submit a PR before end of this week. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > Once FLINK-5698 is implemented then we should be making use of the feature to > push down the projections for a nested table. So in case of HBase if we have > {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query > that needs to select f2.c - then we should be specifically able to project > only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such > projections and HBaseTableSource should make use of that API to do the > projection. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942695#comment-15942695 ] ramkrishna.s.vasudevan commented on FLINK-5752: --- Wil start working on this some time this week. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > Once FLINK-5698 is implemented then we should be making use of the feature to > push down the projections for a nested table. So in case of HBase if we have > {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query > that needs to select f2.c - then we should be specifically able to project > only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such > projections and HBaseTableSource should make use of that API to do the > projection. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface
[ https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941590#comment-15941590 ] ramkrishna.s.vasudevan commented on FLINK-5698: --- Once this is integrated will work on HBasetable source to work with NestedFieldsProjectableTableSource. Thanks [~tonycox]. > Add NestedFieldsProjectableTableSource interface > > > Key: FLINK-5698 > URL: https://issues.apache.org/jira/browse/FLINK-5698 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > > Add a NestedFieldsProjectableTableSource interface for some TableSource > implementation that support nesting projection push-down. > The interface could look as follows > {code} > def trait NestedFieldsProjectableTableSource { > def projectNestedFields(fields: Array[String]): > NestedFieldsProjectableTableSource[T] > } > {code} > This interface works together with ProjectableTableSource -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface
[ https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925872#comment-15925872 ] ramkrishna.s.vasudevan commented on FLINK-5698: --- [~tonycox] Are you still actively working on this PR? LEt me know if I can be of some help here. > Add NestedFieldsProjectableTableSource interface > > > Key: FLINK-5698 > URL: https://issues.apache.org/jira/browse/FLINK-5698 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > > Add a NestedFieldsProjectableTableSource interface for some TableSource > implementation that support nesting projection push-down. > The interface could look as follows > {code} > def trait NestedFieldsProjectableTableSource { > def projectNestedFields(fields: Array[String]): > NestedFieldsProjectableTableSource[T] > } > {code} > This interface works together with ProjectableTableSource -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5962) Cancel checkpoint canceller tasks in CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-5962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896972#comment-15896972 ] ramkrishna.s.vasudevan commented on FLINK-5962: --- I can work on this [~till.rohrmann] - if you have not already started with it. > Cancel checkpoint canceller tasks in CheckpointCoordinator > -- > > Key: FLINK-5962 > URL: https://issues.apache.org/jira/browse/FLINK-5962 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Priority: Critical > > The {{CheckpointCoordinator}} register a canceller task for each running > checkpoint. The canceller task's responsibility is to cancel a checkpoint if > it takes too long to complete. We should cancel this task as soon as the > checkpoint has been completed, because otherwise we will keep many canceller > tasks around. This can eventually lead to an OOM exception. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896781#comment-15896781 ] ramkrishna.s.vasudevan commented on FLINK-4816: --- [~tonywei] I would like to continue with this. Infact was waiting for Stephan's feedback. In a day or two I will submit a PR. Thanks. > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887850#comment-15887850 ] ramkrishna.s.vasudevan commented on FLINK-4816: --- Going through the code, will it better that in the Checkpointcoordinator when we assign the restore checkpoint state to the execution job vertices, can we just set the latest checkpoint id into the vertices? So when we call fail() on the Exceution, and we find that the job vertex has a non negative checkpoint ID, we could wrap the throwable with RestoreTaskException along with the checkpoint id and if the job vertex has a non negative ID then wrap it with just DeployTaskException. Ping [~StephanEwen]? > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-5752: -- Description: This is after the discussion to create NestedProjectableTableSource. Currently we support nested schema for the non-relational type of DBs like HBase. But this does not allow push down projection. This JIRA is to implement that. Once FLINK-5698 is implemented then we should be making use of the feature to push down the projections for a nested table. So in case of HBase if we have {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query that needs to select f2.c - then we should be specifically able to project only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such projections and HBaseTableSource should make use of that API to do the projection. [~fhueske], [~tonycox], [~jark] was: This is after the discussion to create NestedProjectableTableSource. Currently we support nested schema for the non-relational type of DBs like HBase. But this does not allow push down projection. This JIRA is to implement that. I just did a POC on the existing PR https://github.com/apache/flink/pull/3149. The idea was to allow the family#quallifier to be specified in the query directly and parse the family and qualifier info and directly push down for projections. I tried using fam$qual (with $) as separator but that had SQL parse error. Hence went with '_' as the separator and it worked. So now in order to maintain the order in which the projection has to be done we need to have a LinkedHashMap which has the qualifier to typeinfo mapping. This is because with f1_q1, f1_q2 as the schema, we could always project f1_q2 and the selection could be f1_q1. In that case the order of how the projection is done is very important so that there is no type mismatch. [~fhueske], [~tonycox], [~jark] > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > Once FLINK-5698 is implemented then we should be making use of the feature to > push down the projections for a nested table. So in case of HBase if we have > {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query > that needs to select f2.c - then we should be specifically able to project > only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such > projections and HBaseTableSource should make use of that API to do the > projection. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15863265#comment-15863265 ] ramkrishna.s.vasudevan commented on FLINK-5752: --- Yes. I know that. the only thing I wanted to try was with existing ProjectableTableSource if we want to parse the family#qualifier how it has to be done. The $ was not working. I did not know if the nested schema JIRA will be done now or later. So just tried it. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > I just did a POC on the existing PR https://github.com/apache/flink/pull/3149. > The idea was to allow the family#quallifier to be specified in the query > directly and parse the family and qualifier info and directly push down for > projections. > I tried using fam$qual (with $) as separator but that had SQL parse error. > Hence went with '_' as the separator and it worked. > So now in order to maintain the order in which the projection has to be done > we need to have a LinkedHashMap which has the qualifier to typeinfo mapping. > This is because with f1_q1, f1_q2 as the schema, we could always project > f1_q2 and the selection could be f1_q1. In that case the order of how the > projection is done is very important so that there is no type mismatch. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5752) Support push down projections for HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15859357#comment-15859357 ] ramkrishna.s.vasudevan commented on FLINK-5752: --- Since FLINK-5698 is in progress I thnk my way of trying to do the push down without nested schema is not needed. Once FLINK-5698 is done and https://github.com/apache/flink/pull/3149 is also committed we can make this change in HbaseTableSource. > Support push down projections for HBaseTableSource > -- > > Key: FLINK-5752 > URL: https://issues.apache.org/jira/browse/FLINK-5752 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > This is after the discussion to create NestedProjectableTableSource. > Currently we support nested schema for the non-relational type of DBs like > HBase. > But this does not allow push down projection. This JIRA is to implement that. > I just did a POC on the existing PR https://github.com/apache/flink/pull/3149. > The idea was to allow the family#quallifier to be specified in the query > directly and parse the family and qualifier info and directly push down for > projections. > I tried using fam$qual (with $) as separator but that had SQL parse error. > Hence went with '_' as the separator and it worked. > So now in order to maintain the order in which the projection has to be done > we need to have a LinkedHashMap which has the qualifier to typeinfo mapping. > This is because with f1_q1, f1_q2 as the schema, we could always project > f1_q2 and the selection could be f1_q1. In that case the order of how the > projection is done is very important so that there is no type mismatch. > [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5698) Add NestedFieldsProjectableTableSource interface
[ https://issues.apache.org/jira/browse/FLINK-5698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15859353#comment-15859353 ] ramkrishna.s.vasudevan commented on FLINK-5698: --- Thanks for the PR. I think I get the idea of what is been done here. I would have loved to work in this JIRA. Anyway thanks for the update [~tonycox]. > Add NestedFieldsProjectableTableSource interface > > > Key: FLINK-5698 > URL: https://issues.apache.org/jira/browse/FLINK-5698 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > > Add a NestedFieldsProjectableTableSource interface for some TableSource > implementation that support nesting projection push-down. > The interface could look as follows > {code} > def trait NestedFieldsProjectableTableSource { > def projectNestedFields(fields: Array[String]): > NestedFieldsProjectableTableSource[T] > } > {code} > This interface works together with ProjectableTableSource -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5752) Support push down projections for HBaseTableSource
ramkrishna.s.vasudevan created FLINK-5752: - Summary: Support push down projections for HBaseTableSource Key: FLINK-5752 URL: https://issues.apache.org/jira/browse/FLINK-5752 Project: Flink Issue Type: Sub-task Reporter: ramkrishna.s.vasudevan Assignee: ramkrishna.s.vasudevan This is after the discussion to create NestedProjectableTableSource. Currently we support nested schema for the non-relational type of DBs like HBase. But this does not allow push down projection. This JIRA is to implement that. I just did a POC on the existing PR https://github.com/apache/flink/pull/3149. The idea was to allow the family#quallifier to be specified in the query directly and parse the family and qualifier info and directly push down for projections. I tried using fam$qual (with $) as separator but that had SQL parse error. Hence went with '_' as the separator and it worked. So now in order to maintain the order in which the projection has to be done we need to have a LinkedHashMap which has the qualifier to typeinfo mapping. This is because with f1_q1, f1_q2 as the schema, we could always project f1_q2 and the selection could be f1_q1. In that case the order of how the projection is done is very important so that there is no type mismatch. [~fhueske], [~tonycox], [~jark] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847206#comment-15847206 ] ramkrishna.s.vasudevan commented on FLINK-4810: --- bq.numUnsuccessfulCheckpointsTriggers Already we have this in Checkpoint Coordinator. So we could safely use this to compare against the configured 'n' and trigger a fail on the ExecutionGraph ? - Ping [~StephanEwen]? > Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful > checkpoints > > > Key: FLINK-4810 > URL: https://issues.apache.org/jira/browse/FLINK-4810 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen > > The Checkpoint coordinator should track the number of consecutive > unsuccessful checkpoints. > If more than {{n}} (configured value) checkpoints fail in a row, it should > call {{fail()}} on the execution graph to trigger a recovery. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
Title: Message Title ramkrishna.s.vasudevan commented on FLINK-4810 Re: Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints Can I assign this to my name? I can try working on this from tomorrow my time. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (FLINK-4808) Allow skipping failed checkpoints
Title: Message Title ramkrishna.s.vasudevan commented on FLINK-4808 Re: Allow skipping failed checkpoints Ok. Stephan Ewen - thanks for looking into this. I can check #2 item. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (FLINK-4461) Ensure all the classes are tagged with suitable annotations
Title: Message Title ramkrishna.s.vasudevan commented on FLINK-4461 Re: Ensure all the classes are tagged with suitable annotations Is there a maven plugin for this? If so yes we can add it to the pom files of those respective modules. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] [Commented] (FLINK-5554) Add sql operator to table api for getting columns from HBase
[ https://issues.apache.org/jira/browse/FLINK-5554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829453#comment-15829453 ] ramkrishna.s.vasudevan commented on FLINK-5554: --- Thanks for starting this thread via JIRA [~tonycox]. [~fhueske] I was acutally asking the same thing in the other JIRA FLINK-2168 as how does the family model will get translated into calcite. In Apache Phoenix projects it understands the language of Calcite and HBase and it tries to act as the wrapper because the family name is actually created and maintained by Phoenix. So for the SQL layer it is purely relational query. But here in Flink that is not the case. We know there is a table in HBase but the data model is purely non relational. bq.What should work straightforward is to have a TableSource which maps the schema of an HBase table into a relational schema. Not sure on that. bq.I think the HBaseTableSource should return a composite type (with column family and qualifier), and we can get columns by composite type accessing. Sorry I am not getting this. Can some one help throw more light on this. I am interested to work here and get the entire thing working. Thanks [~jark]. My other suggestion would be will it make sense to add a PhoenixTableSource so that it becomes easy to take to relational data model of Phoenix? > Add sql operator to table api for getting columns from HBase > > > Key: FLINK-5554 > URL: https://issues.apache.org/jira/browse/FLINK-5554 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Anton Solovev > > example of select query > {code} > table.select("f1:q1, f1:q2, f1:q3"); > {code} > or/and > {code} > table.select('f1:'q1, 'f1:'q2, 'f1:'q3); > {code} > let's discuss how to provide better api for selecting from HBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15825551#comment-15825551 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- Trying your suggestions bq.Since the code in flink-hbase is implemented in Java, the new HBaseTableSource should do the same. If I try making HBaseTableSource in java I don't have BatchTableSource or ProjectableTableSource in scala -so I need to create them first? And then link flink-hbase under flink-connectors to make use of flink-table? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824291#comment-15824291 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- bq.However, I think we would need a custom TableInputFormat anyway which returns Row instead of Tuple which is restricted to 25 fields and does not support null. This was the reason. Actually I did not extend the TableInputFormat rather extended RichInputFormat. I created HBaseTableSource in flink-table and I created it as scala. I can change them to Java. Thanks [~fhueske]. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823811#comment-15823811 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- Few questions on the package structure Where should the HBaseTableSource.scala be added? Should that be inside flink-libraries/flink-table? And where should the HBaseTableInputFormat be added? If I add it in the module flink-java like the other input formats then we don't have HBase reference in the pom? So can we modify the pom.xml of flink-java to have HBase library in it? If I move the HBaseTableInputFormat to flink-connector/flink-hbase then the HBaseTableSource.scala cannot access the HBaseTableInputFormat . Any suggestions here [~fhueske]? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15818486#comment-15818486 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- One suggestion is that, though we are going to issue a scan (batch read) or get (random read) it is better we specify the family and qualifier to be used in that read. Otherwise we end up returning more results and on the result we need to filter out the fieldNAmes that were passed. Assuming that the fiedNames are unique and there are no same colNames in two different families. For creating the DataSet, need to create some input format that implements ResultTypeQueryable? Am I right? And that is where the conversion of the byte[] result to the TypeInfo specified happens I believe. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814976#comment-15814976 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- Thanks for the info. Am reading this and will get back here [~fhueske]. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814435#comment-15814435 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- The timing was perfect. I was reading thro some more examples here in Table APIs and was about to ping here. bq.Calcite requires a relational schema. So we cannot support flexible schema. Ok. For now we can assume the HBase table to have a fixed schema - say read from a table that was loaded using a flat file (like CSV). bq.We need unique field names. The user could explicitly specify how fields are named. Ok. Suppose we say {code} select * from table where 'col' = 10 {code} So we need to implement the way this SQL is going to be implemented. Something like what Apache Phoenix-Calcite integration does? Calcite can take care of the grammer parsing but we need to convert what Calcite model gives us to actual queries on hbase and in this case include some filters etc. So this JIRA will be doing that big stuff also? And coming to the data type - say the user wants the table 'col' to be integer so once the data is written to this col we would convert the int to an byte[] and store it. So on retrieval we should use the exact serde format that Flink is aware of right? > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15811535#comment-15811535 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- Thanks [~fhueske]. I was going thro the related JIRAs. I was just following how CSVTableSource works here. Initial questions - Should we see the HBase tables as NOSQL table or like a normal table with a proper schema which defines the set of columns per row? - In HBase the columns can have same name but may come under different column families. So how we bring that abstracted view? - Next is that, we do scan of an hbase table and the result that we get is in the form of bytes. Where can we do the type conversion to String, double, long etc? Or may be that is not needed for now? Or may be calcite is of help here? Sorry if my questions are naive here - after some discussion I think we can discuss on the design part. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15804049#comment-15804049 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- [~fhueske] Can I work on this now? I can see that FLINK-3848 is done. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Wilmer DAZA >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4815) Automatic fallback to earlier checkpoints when checkpoint restore fails
[ https://issues.apache.org/jira/browse/FLINK-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15798138#comment-15798138 ] ramkrishna.s.vasudevan commented on FLINK-4815: --- Can i take up one of the sub-tasks if no one is working on this? Infact can do all the subtasks one by one. > Automatic fallback to earlier checkpoints when checkpoint restore fails > --- > > Key: FLINK-4815 > URL: https://issues.apache.org/jira/browse/FLINK-4815 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen > > Flink should keep multiple completed checkpoints. > When the restore of one completed checkpoint fails for a certain number of > times, the CheckpointCoordinator should fall back to an earlier checkpoint to > restore. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4808) Allow skipping failed checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15798136#comment-15798136 ] ramkrishna.s.vasudevan commented on FLINK-4808: --- Seems some sub-tasks are unassigned. Can i take up one of them? > Allow skipping failed checkpoints > - > > Key: FLINK-4808 > URL: https://issues.apache.org/jira/browse/FLINK-4808 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.1.2, 1.1.3 >Reporter: Stephan Ewen >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > Currently, if Flink cannot complete a checkpoint, it results in a failure and > recovery. > To make the impact of less stable storage infrastructure on the performance > of Flink less severe, Flink should be able to tolerate a certain number of > failed checkpoints and simply keep executing. > This should be controllable via a parameter, for example: > {code} > env.getCheckpointConfig().setAllowedFailedCheckpoints(3); > {code} > A value of {{-1}} could indicate an infinite number of checkpoint failures > tolerated by Flink. > The default value should still be {{0}}, to keep compatibility with the > existing behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan reassigned FLINK-3322: - Assignee: ramkrishna.s.vasudevan > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
ramkrishna.s.vasudevan created FLINK-4615: - Summary: Reusing the memory allocated for the drivers and iterators Key: FLINK-4615 URL: https://issues.apache.org/jira/browse/FLINK-4615 Project: Flink Issue Type: Sub-task Reporter: ramkrishna.s.vasudevan Assignee: ramkrishna.s.vasudevan Raising as a subtask so that individually can be committed and for better closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-3322: -- Attachment: FLINK-3322_reusingmemoryfordrivers.docx Attaching a doc with two approaches for reusing the memory for the drivers. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15487192#comment-15487192 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~ggevay] I think I was able to make the driver related changes as you suggested but with slight modifications. I will update a doc for that and also another PR with only the driver related changes. If needed I can combine the already submitted PR and the new one. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484571#comment-15484571 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- To go forward with this, I think as [~ggevay] suggested, I could create two PRs one for the allocation of memory for the sorters and other for the iterators. So that we could see the amount of change and if needed make one PR combining both and take it in. Any suggestions? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15484104#comment-15484104 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- I did try changing the code to ensure that the Drivers can be made to hold on to the memory segments for iterative tasks. But that is indeed a tedious one. We need to change and touch lot of places. Changing the sorters and their memory allocation was much more contained and better to look at. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15483225#comment-15483225 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~ggevay] I agree with you. But in my test i found that the maximum allocation was coming out from the different Iterative tasks allocating the pages in the Sorters. Yes the Join Drivers are also calling allocatePages every time. I am not very sure if we can make every thing resettable but rather we could allow the task to inform the driver that they are iterative tasks and in that case let the driver cache the memory pages that it creates for the different join strategies and let the close call retain these memory segments at the driver level. So we may need to create a constructor in all the Iterators where it could accept a list of memorySegment rather than only the memory manager. I have any way completed the code to make the Sorters use the memory that it created rather than creating it every time. You want to see that PR? I can build on top of that where we can ask the iterators to reuse the memory segments? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476131#comment-15476131 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~ggevay] I once again checked your comments. I initially thought we need a reset method to ensure we again use the memory segments that were allocated instead of closing. But from the code read I thought it is not so simple. I am not saying it is not possible but it requires few more things. AT the beginning itself we create the required segments and for the read and write buffers we do remove and add to the new list(for write and large buffers). So if we do reset we just cannot do a normal reset like to reuse the buffers but we have to put back the buffers that we pulled for write and large buffers. When we go on to the LArgeRecordHolder even there we do the same thing. So in the reset method we should be ensuring that what ever was removed has to be added back which means that some of the locally created MemorySegment list has to be moved to class level. Correct me if am wrong here? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15475975#comment-15475975 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~ggevay] Thanks for the inputs. Let me see that also. I am just trying to make things work. I can make a PR after I feel it is good. May be early next week. This whole week was busy with some personal things so could not complete on time. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15470419#comment-15470419 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- I am able to handle cases for the iterative jobs. But when we go to the LargeRecordHandlers then it is much more trickier. Checking that part Will get back on that. Currently the design is that the MemoryAllocator will be passed on to the Sorters and the memory allocator will have pre created memory segments. If the memory allocator is created by Iterative tasks then we ensure that such segments are not directly released to memory manager and retain them till the iterative tasks receive termination signal. In normal batch task cases - the memory allocators created are not to be kept for further iterations and hence we close them out. The sorters create read buffers, write buffers and large buffers. These are all static based. But inside large record handler we have some dynamic way to decide the number of records needed for keys and records. Will get back on this. Any suggestions/feedbacks here? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456065#comment-15456065 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- Reading the code I think I need to update the doc on how the memory pages can be allocated for each task. Will update the doc. Will do some tests before that. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4461) Ensure all the classes are tagged with suitable annotations
[ https://issues.apache.org/jira/browse/FLINK-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-4461: -- Attachment: FLINK_annotations.xlsx Just attaching a list of public classes and interfaces that has no annotations attached with it. May be worth to fix atleast the actual public classes which are missing annotations. > Ensure all the classes are tagged with suitable annotations > --- > > Key: FLINK-4461 > URL: https://issues.apache.org/jira/browse/FLINK-4461 > Project: Flink > Issue Type: Improvement >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Attachments: FLINK_annotations.xlsx > > > Currently in Flink we have three annotations > Public > PublicEvolving > Internal. > But some of the classes though they are public they are not tagged. It may be > even advanced features but still tagging them could help the user to know > which are public facing and which are Internal API/interfaces. > I just ran a sample util in streaming-java package and I got these > {code} > class org.apache.flink.streaming.runtime.operators.CheckpointCommitter > class > org.apache.flink.streaming.api.functions.source.FileMonitoringFunction$WatchType > interface org.apache.flink.streaming.api.functions.TimestampExtractor > class > org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows > class org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet > class org.apache.flink.streaming.api.windowing.triggers.TriggerResult > class > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor > class org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator > class > org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink$ExactlyOnceState > interface > org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks > class > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows > interface > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction > interface > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet$MergeFunction > class org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider > class > org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema > class org.apache.flink.streaming.api.functions.source.FileReadFunction > class > org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows > class org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask > class org.apache.flink.streaming.api.functions.source.FileMonitoringFunction > class org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput > class org.apache.flink.streaming.api.functions.IngestionTimeExtractor > class > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction > interface org.apache.flink.streaming.api.functions.TimestampAssigner > class org.apache.flink.streaming.api.operators.StoppableStreamSource > class org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction > class org.apache.flink.streaming.util.HDFSCopyToLocal > class > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator > class org.apache.flink.streaming.api.collector.selector.DirectedOutput > class org.apache.flink.streaming.runtime.tasks.TimeServiceProvider > class org.apache.flink.streaming.util.HDFSCopyFromLocal > class > org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows > {code} > These classes are simply not tagged. In the above example TimeStampAssigner > should fall in @Public tag I believe. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-3322: -- Attachment: FLINK-3322.docx A simple doc explaining how we can approach this JIRA. I am not very sure if i have covered all the cases but seeing one of Stephan's older comment I could see the idea is pretty much similar. I had gone through the code first to know how things are working. Feedback/comments are welcome. Also am not sure the future plans of memory manager so it would be great if we could align it with that. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4484) FLIP-10: Unify Savepoints and Checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15448004#comment-15448004 ] ramkrishna.s.vasudevan commented on FLINK-4484: --- I can help out in the sub tasks too, particularly with the unassigned ones. [~uce]? > FLIP-10: Unify Savepoints and Checkpoints > - > > Key: FLINK-4484 > URL: https://issues.apache.org/jira/browse/FLINK-4484 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi > > Super issue to track progress for > [FLIP-10|https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438412#comment-15438412 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~StephanEwen] I thought of working on this? If you can walk through the future improvements to memory management I can take this up. Fine with [~ggevay] is also ready to volunteer in this. Let me know what you think [~StephanEwen]. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4367) Offer separate API for watermark generation and timestamp extraction
[ https://issues.apache.org/jira/browse/FLINK-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15436334#comment-15436334 ] ramkrishna.s.vasudevan commented on FLINK-4367: --- So it is better to remove the TimeStampASsigner interface from the two classes AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks ? So for 2.0 we will do this and for the lower branch versions we will mark it as @Deprecated and say that in future from 2.0 they will not be implementing TimeStampAssigner - [~rmetzger]? > Offer separate API for watermark generation and timestamp extraction > > > Key: FLINK-4367 > URL: https://issues.apache.org/jira/browse/FLINK-4367 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Affects Versions: 1.1.0 >Reporter: Robert Metzger > Fix For: 2.0.0 > > > Right now, the {{AssignerWithPunctuatedWatermarks}} and > {{AssignerWithPeriodicWatermarks}} interfaces also require implementing a > {{TimestampAssigner}}. > For cases where the source emits records with timestamps, its not necessary > to extract timestamps again from the records, we just want to generate > watermarks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15436212#comment-15436212 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- I can work on this and come up with a design doc to ensure that Iterative jobs are smart enough to ensure they don't go for requesting memory segments every time. [~StephanEwen]? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4462) Add RUN_TIME retention policy for all the flink annotations
ramkrishna.s.vasudevan created FLINK-4462: - Summary: Add RUN_TIME retention policy for all the flink annotations Key: FLINK-4462 URL: https://issues.apache.org/jira/browse/FLINK-4462 Project: Flink Issue Type: Improvement Reporter: ramkrishna.s.vasudevan Assignee: ramkrishna.s.vasudevan Priority: Minor It is better to add RUNTIME retention policy to flink annotations. So that utilites/tests can be added to ensure if the classes/interfaces are all tagged with proper annotations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4461) Ensure all the classes are tagged with suitable annotations
ramkrishna.s.vasudevan created FLINK-4461: - Summary: Ensure all the classes are tagged with suitable annotations Key: FLINK-4461 URL: https://issues.apache.org/jira/browse/FLINK-4461 Project: Flink Issue Type: Improvement Reporter: ramkrishna.s.vasudevan Currently in Flink we have three annotations Public PublicEvolving Internal. But some of the classes though they are public they are not tagged. It may be even advanced features but still tagging them could help the user to know which are public facing and which are Internal API/interfaces. I just ran a sample util in streaming-java package and I got these {code} class org.apache.flink.streaming.runtime.operators.CheckpointCommitter class org.apache.flink.streaming.api.functions.source.FileMonitoringFunction$WatchType interface org.apache.flink.streaming.api.functions.TimestampExtractor class org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows class org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet class org.apache.flink.streaming.api.windowing.triggers.TriggerResult class org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor class org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator class org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink$ExactlyOnceState interface org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks class org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows interface org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks class org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction interface org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet$MergeFunction class org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider class org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema class org.apache.flink.streaming.api.functions.source.FileReadFunction class org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows class org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask class org.apache.flink.streaming.api.functions.source.FileMonitoringFunction class org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput class org.apache.flink.streaming.api.functions.IngestionTimeExtractor class org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator class org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction class org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction class org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction interface org.apache.flink.streaming.api.functions.TimestampAssigner class org.apache.flink.streaming.api.operators.StoppableStreamSource class org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink class org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction class org.apache.flink.streaming.util.HDFSCopyToLocal class org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator class org.apache.flink.streaming.api.collector.selector.DirectedOutput class org.apache.flink.streaming.runtime.tasks.TimeServiceProvider class org.apache.flink.streaming.util.HDFSCopyFromLocal class org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows {code} These classes are simply not tagged. In the above example TimeStampAssigner should fall in @Public tag I believe. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4461) Ensure all the classes are tagged with suitable annotations
[ https://issues.apache.org/jira/browse/FLINK-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan reassigned FLINK-4461: - Assignee: ramkrishna.s.vasudevan > Ensure all the classes are tagged with suitable annotations > --- > > Key: FLINK-4461 > URL: https://issues.apache.org/jira/browse/FLINK-4461 > Project: Flink > Issue Type: Improvement >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > > Currently in Flink we have three annotations > Public > PublicEvolving > Internal. > But some of the classes though they are public they are not tagged. It may be > even advanced features but still tagging them could help the user to know > which are public facing and which are Internal API/interfaces. > I just ran a sample util in streaming-java package and I got these > {code} > class org.apache.flink.streaming.runtime.operators.CheckpointCommitter > class > org.apache.flink.streaming.api.functions.source.FileMonitoringFunction$WatchType > interface org.apache.flink.streaming.api.functions.TimestampExtractor > class > org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows > class org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet > class org.apache.flink.streaming.api.windowing.triggers.TriggerResult > class > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor > class org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator > class > org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink$ExactlyOnceState > interface > org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks > class > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows > interface > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction > interface > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet$MergeFunction > class org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider > class > org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema > class org.apache.flink.streaming.api.functions.source.FileReadFunction > class > org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows > class org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask > class org.apache.flink.streaming.api.functions.source.FileMonitoringFunction > class org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput > class org.apache.flink.streaming.api.functions.IngestionTimeExtractor > class > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction > interface org.apache.flink.streaming.api.functions.TimestampAssigner > class org.apache.flink.streaming.api.operators.StoppableStreamSource > class org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction > class org.apache.flink.streaming.util.HDFSCopyToLocal > class > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator > class org.apache.flink.streaming.api.collector.selector.DirectedOutput > class org.apache.flink.streaming.runtime.tasks.TimeServiceProvider > class org.apache.flink.streaming.util.HDFSCopyFromLocal > class > org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows > {code} > These classes are simply not tagged. In the above example TimeStampAssigner > should fall in @Public tag I believe. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433273#comment-15433273 ] ramkrishna.s.vasudevan commented on FLINK-4437: --- bq.I wonder whether we should just simply guard the entire method with a lock, and not release it in-between. Would be much simpler, and probably okay as well. +1. I was actually planning to raise this. Is there any minimum duration that we need to set for the interval between two checkpoints? > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp
[ https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan reassigned FLINK-4417: - Assignee: ramkrishna.s.vasudevan > Checkpoints should be subsumed by CheckpointID not, by timestamp > > > Key: FLINK-4417 > URL: https://issues.apache.org/jira/browse/FLINK-4417 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > Fix For: 1.2.0 > > > Since the system clocks cannot be expected to be stable/monotonous, the > subsumption logic in the checkpoint coordinator should not decide which > checkpoint is "newer" based on the system time. > The checkpoint ID is guaranteed to be strictly monotonously increasing. It is > a better measure to decide which checkpoint is newer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp
[ https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428450#comment-15428450 ] ramkrishna.s.vasudevan commented on FLINK-4417: --- Thank you. Will start with this early next week. > Checkpoints should be subsumed by CheckpointID not, by timestamp > > > Key: FLINK-4417 > URL: https://issues.apache.org/jira/browse/FLINK-4417 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen > Fix For: 1.2.0 > > > Since the system clocks cannot be expected to be stable/monotonous, the > subsumption logic in the checkpoint coordinator should not decide which > checkpoint is "newer" based on the system time. > The checkpoint ID is guaranteed to be strictly monotonously increasing. It is > a better measure to decide which checkpoint is newer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp
[ https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427688#comment-15427688 ] ramkrishna.s.vasudevan commented on FLINK-4417: --- [~StephanEwen] Is it ok if I can take this up? If you have already started working on this - then no problem. > Checkpoints should be subsumed by CheckpointID not, by timestamp > > > Key: FLINK-4417 > URL: https://issues.apache.org/jira/browse/FLINK-4417 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Stephan Ewen > Fix For: 1.2.0 > > > Since the system clocks cannot be expected to be stable/monotonous, the > subsumption logic in the checkpoint coordinator should not decide which > checkpoint is "newer" based on the system time. > The checkpoint ID is guaranteed to be strictly monotonously increasing. It is > a better measure to decide which checkpoint is newer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4388) Race condition during initialization of MemorySegmentFactory
[ https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419861#comment-15419861 ] ramkrishna.s.vasudevan commented on FLINK-4388: --- [~StephanEwen] I think it happened because I would have just seen the JIRA even before you had assigned it. I should have refreshed the page before commenting. My bad. No problem. Just wanted to see if I could be of help here. > Race condition during initialization of MemorySegmentFactory > > > Key: FLINK-4388 > URL: https://issues.apache.org/jira/browse/FLINK-4388 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.2 > > > The check whether the factory is initialized, and the actual initialization > are not atomic. When starting multiple TaskManagers, this can lead to races > and exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4388) Race condition during initialization of MemorySegmentFactory
[ https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15418777#comment-15418777 ] ramkrishna.s.vasudevan commented on FLINK-4388: --- When I commented there I saw it as 'Unassigned' and hence did that comment. No problem. Thank you. > Race condition during initialization of MemorySegmentFactory > > > Key: FLINK-4388 > URL: https://issues.apache.org/jira/browse/FLINK-4388 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.2 > > > The check whether the factory is initialized, and the actual initialization > are not atomic. When starting multiple TaskManagers, this can lead to races > and exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4388) Race condition during initialization of MemorySegmentFactory
[ https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15418752#comment-15418752 ] ramkrishna.s.vasudevan commented on FLINK-4388: --- I can take this up and come up with a patch early next week, if no one is working on this. Thanks. > Race condition during initialization of MemorySegmentFactory > > > Key: FLINK-4388 > URL: https://issues.apache.org/jira/browse/FLINK-4388 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.2 > > > The check whether the factory is initialized, and the actual initialization > are not atomic. When starting multiple TaskManagers, this can lead to races > and exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4322) Unify CheckpointCoordinator and SavepointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410662#comment-15410662 ] ramkrishna.s.vasudevan commented on FLINK-4322: --- Ok, got it. I understand. Thanks. > Unify CheckpointCoordinator and SavepointCoordinator > > > Key: FLINK-4322 > URL: https://issues.apache.org/jira/browse/FLINK-4322 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > Fix For: 1.2.0 > > > The Checkpoint coordinator should have the functionality of both handling > checkpoints and savepoints. > The difference between checkpoints and savepoints is minimal: > - savepoints always write the root metadata of the checkpoint > - savepoints are always full (never incremental) > The commonalities are large > - jobs should be able to resume from checkpoint or savepoints > - jobs should fall back to the latest checkpoint or savepoint > This subsumes issue https://issues.apache.org/jira/browse/FLINK-3397 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4322) Unify CheckpointCoordinator and SavepointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409799#comment-15409799 ] ramkrishna.s.vasudevan commented on FLINK-4322: --- bq.jobs should be able to resume from checkpoint or savepoints bq.jobs should fall back to the latest checkpoint or savepoint Ya this is what FLINK-3397 tries to achieve too. [~uce] had given some comments and I had update the document but not yet uploaded in that JIRa. He was also telling about job and save point mapping. I had raised some comments and doubts regarding that in the other JIRA. As he is very busy just waiting for his inputs. As this and FLINK-3397 are related am happy to work on these and will do after we are arriving at a consensus on the design part. Thanks. > Unify CheckpointCoordinator and SavepointCoordinator > > > Key: FLINK-4322 > URL: https://issues.apache.org/jira/browse/FLINK-4322 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > Fix For: 1.2.0 > > > The Checkpoint coordinator should have the functionality of both handling > checkpoints and savepoints. > The difference between checkpoints and savepoints is minimal: > - savepoints always write the root metadata of the checkpoint > - savepoints are always full (never incremental) > The commonalities are large > - jobs should be able to resume from checkpoint or savepoints > - jobs should fall back to the latest checkpoint or savepoint > This subsumes issue https://issues.apache.org/jira/browse/FLINK-3397 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"
[ https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan reassigned FLINK-4253: - Assignee: ramkrishna.s.vasudevan > Rename "recovery.mode" config key to "high-availability" > > > Key: FLINK-4253 > URL: https://issues.apache.org/jira/browse/FLINK-4253 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: ramkrishna.s.vasudevan > > Currently, HA is configured via the following configuration keys: > {code} > recovery.mode: STANDALONE // No high availability (HA) > recovery.mode: ZOOKEEPER // HA > {code} > This could be more straight forward by simply renaming the key to > {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We > already have standalone cluster mode. > {code} > high-availability: NONE // No HA > high-availability: ZOOKEEPER // HA via ZooKeeper > {code} > The {{recovery.mode}} configuration keys would have to be deprecated before > completely removing them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"
[ https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409453#comment-15409453 ] ramkrishna.s.vasudevan commented on FLINK-4253: --- I can take this up. > Rename "recovery.mode" config key to "high-availability" > > > Key: FLINK-4253 > URL: https://issues.apache.org/jira/browse/FLINK-4253 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: ramkrishna.s.vasudevan > > Currently, HA is configured via the following configuration keys: > {code} > recovery.mode: STANDALONE // No high availability (HA) > recovery.mode: ZOOKEEPER // HA > {code} > This could be more straight forward by simply renaming the key to > {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We > already have standalone cluster mode. > {code} > high-availability: NONE // No HA > high-availability: ZOOKEEPER // HA via ZooKeeper > {code} > The {{recovery.mode}} configuration keys would have to be deprecated before > completely removing them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409350#comment-15409350 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- Updated the PR. Pls check. [~StephanEwen] Thanks for the comments. bq. I think before we change memory allocation behavior, we should discuss that on the Flink mailing list. ARe we going to send a mail to the community and decide if we can allow preAllocation for OFFHEAP buffers? > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408274#comment-15408274 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- I created the log related msg and the doc. So what info should we give in the doc. Will the following be enough? {code} +- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`, because when set to +`false`, cleaning up of the allocated offheap memory kicks up only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. {code} I can submit a PR after this. bq.If a task requests more memory than the system is configured for, it will throw an error. Thanks [~StephanEwen]. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15404384#comment-15404384 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- I can do this Log update and document update tomorrow and open a PR for the same. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15404378#comment-15404378 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- bq. We cannot really manually release the memory when freeing the segment, because the ByteBuffer wrapper object may still exist. Ideally when we are going to pool we won't try to free the memory - so the ByteBuffer wrapper will exist and that is what we will pool. I think once we do this we wont do segment.free() on that buffer and we will allow the address to be valid - if am not wrong. Just a question, In case of { preallocation = true }, what does happen if the number of requests is more than the initial size? So we consume all the buffers in the pool but new requets won't be served? bq.What we can do now, is to discourage the use of off-heap memory with preallocation set to false. For example, print a prominent warning and add a hint to the documentation. May be for now we can do it. bq. I think before we change memory allocation behavior, we should discuss that on the Flink mailing list. Ok sounds like a plan. So once we discuss I think we can go with the lazy allocation pooling model and that should be beneficial. Because anyway current pooling is with a unbounded queue and similarly it can be done here too. One thing to note is that even with pooling if the MaxDirectMemory is still not configured right we will not be able to work with offheap buffers. The only thing is we won't grow infinitely. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403843#comment-15403843 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- Ok. bq.If we use pooling, I think we should just follow the exact same pooling pattern as the "preallocation" case, only make the allocation lazy. Am fine with the simple pattern. Can I have a go at it? > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403533#comment-15403533 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- bq.So, another option to fix this would be to set the MaxDirectMemorySize parameter properly. Yes. I agree. But when the job runs in a multi tenant system where there are other process running and they are also memory intensive configuring this may always not be easy. I agree it is a direct way to solve the problem if one really knows his memory needs and requirements. Regarding Pooling, some techniques that can be followed ( am saying from the we have used it in our projects) -> Just pool the offheap byte buffers (all are fixed sized buffers). Once the usage is over put them back to pool. If the pool is empty we need to wait (blocking call - which may not be accepted). So either create onheap buffers which may not be right in this use case (but it is ideally safe). Or allocate offheap buffers dynamically and warn the user that his pool size has to be increased because he is frequently allocating dynamic offheap buffers. -> Another way to avoid segementation could be like Chunking. I can see that by default we create 32K sized buffers (page size). Instead we could create say 2MB sized offheap buffers and keep allocating 32K sized offset on every request. Again all the 2MB sized buffers will be pooled but once a buffer is requested from the pool we try to allocate 32K offsets. Once a buffer is full or the next request cannot be contained in it then move on to the next buffer. In turn we can pool these chunks also so that once a chunk is done we put them back to a chunk pool and reuse it once that portion of the chunk is done. But this needs some knowledge of when the task has exactly completed the usage of that chunk. There should not be any references to it. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for EventTime aware User Function
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400422#comment-15400422 ] ramkrishna.s.vasudevan commented on FLINK-3674: --- Thanks for the comments/feedback. I could see that the initial thought was to just expose some interface so that UDFs can implement them and get a call back onWaterMark. LAter Timer based interface was talked about. So my initial thought was to just do the changes so that Timer is exposed as an interface based on EventTimeFunction. The idea was not to make this PR as a final one but to bring in the discussion. If the practice in FLINK is to make design doc based discussions I can ensure that for such PRs i will first add a doc and then PR. This happened to another PR also. So I will learn better and change my methodology. bq.Right now, WindowOperator has a custom implementation of this. This should be taken as the basis for a generic implementation than can then also be exposed to users. My thought of exposing the Timer as a first step and then build it based on feedback was because of this. Since the Timer in WindowOperator is custom one I thought first converting it to an interface would help to add on and see what can we do to make it generic. > Add an interface for EventTime aware User Function > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399085#comment-15399085 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- [~mxm] Thanks for the comment. My comment was bit vague as I added them while on travel. Let me try to explain based on what I see in code. If am missing something or wrong, pls do correct me. The MemoryManager manages both Heap and offheap memory segment. {code} @Override HybridMemorySegment allocateNewSegment(Object owner) { ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); return HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(memory, owner); } @Override HybridMemorySegment requestSegmentFromPool(Object owner) { ByteBuffer buf = availableMemory.remove(); return HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(buf, owner); } @Override void returnSegmentToPool(MemorySegment segment) { if (segment.getClass() == HybridMemorySegment.class) { HybridMemorySegment hybridSegment = (HybridMemorySegment) segment; ByteBuffer buf = hybridSegment.getOffHeapBuffer(); availableMemory.add(buf); hybridSegment.free(); } else { throw new IllegalArgumentException("Memory segment is not a " + HeapMemorySegment.class.getSimpleName()); } } {code} If you see the usage of the above APIs {code} if (isPreAllocated) { for (int i = numPages; i > 0; i--) { MemorySegment segment = memoryPool.requestSegmentFromPool(owner); target.add(segment); segmentsForOwner.add(segment); } } else { for (int i = numPages; i > 0; i--) { MemorySegment segment = memoryPool.allocateNewSegment(owner); target.add(segment); segmentsForOwner.add(segment); } numNonAllocatedPages -= numPages; } {code} So if there is preAllocation enabled the memory buffer is requested from the pool or every time there is newsegment allocated. Coming to the release of these buffers {code} if (isPreAllocated) { // release the memory in any case memoryPool.returnSegmentToPool(segment); } else { segment.free(); numNonAllocatedPages++; } {code} Again only if preAllocation is enabled we are returning to pool. Ya as you clearly pointed out it is just dynamic allocation that we do and on memory manager shutdown we clear the allocated buffers. But for offheap this will not be enough as the GC will not be able to garbage collect them unless the fullGC happens. I would rather say that it is better we do internal management of offheap buffers. We should create a pool from which the buffers are allocated and if the pool is of fixed size and we have requests for more buffers than the size of the pool we should allocate them onheap only. (if that is acceptable). Currently the memory management pool is done by ArrayDeque. We only allow initialSize and I think it can grow beyond too. So for offheap buffers we should have a fixed size pool and as and when the demand grows we should allocate few buffers onheap and once the pool is again able to offer buffers we use them. bq.I don't think just disallowing preallocation:false is a good fix. Yes. I agree. That is a hacky one. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.prea
[jira] [Assigned] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan reassigned FLINK-4094: - Assignee: ramkrishna.s.vasudevan > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397634#comment-15397634 ] ramkrishna.s.vasudevan commented on FLINK-4094: --- Can I work on this? I saw the code - if preallocation is false and the memory type is OFFHEAP we should not be allowing that config. May be we can even disallow that combination or if OFFHEAP is the type we will always allow preallocation. Or add our own internal offheap buffer management that is not exposed to user? May be that is redundant. Not allowing preallocation and creating offheap buffers is very dangerous. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for EventTime aware User Function
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397018#comment-15397018 ] ramkrishna.s.vasudevan commented on FLINK-3674: --- [~aljoscha] Any feed back/suggestions here? > Add an interface for EventTime aware User Function > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint
[ https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391103#comment-15391103 ] ramkrishna.s.vasudevan commented on FLINK-3397: --- [~uce] bq.I fear though that these changes require some more consideration about how savepoints are stored/accessed. They are currently mostly independent of the job from which they were created. I read thro the code. The CheckPointIdCounter (ZooKeeperCheckpointIDCounter) tries to create a counter per job id using the job id path in the zookeeper. So which means the savepoint and checkpoints are stored and accessed per job only right? If this is wrong, then am missing something. Pls correct me if am wrong. > Failed streaming jobs should fall back to the most recent checkpoint/savepoint > -- > > Key: FLINK-3397 > URL: https://issues.apache.org/jira/browse/FLINK-3397 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.0.0 >Reporter: Gyula Fora >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Attachments: FLINK-3397.pdf > > > The current fallback behaviour in case of a streaming job failure is slightly > counterintuitive: > If a job fails it will fall back to the most recent checkpoint (if any) even > if there were more recent savepoint taken. This means that savepoints are not > regarded as checkpoints by the system only points from where a job can be > manually restarted. > I suggest to change this so that savepoints are also regarded as checkpoints > in case of a failure and they will also be used to automatically restore the > streaming job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint
[ https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15389062#comment-15389062 ] ramkrishna.s.vasudevan commented on FLINK-3397: --- [~uce] A gentle ping !!! :) > Failed streaming jobs should fall back to the most recent checkpoint/savepoint > -- > > Key: FLINK-3397 > URL: https://issues.apache.org/jira/browse/FLINK-3397 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.0.0 >Reporter: Gyula Fora >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Attachments: FLINK-3397.pdf > > > The current fallback behaviour in case of a streaming job failure is slightly > counterintuitive: > If a job fails it will fall back to the most recent checkpoint (if any) even > if there were more recent savepoint taken. This means that savepoints are not > regarded as checkpoints by the system only points from where a job can be > manually restarted. > I suggest to change this so that savepoints are also regarded as checkpoints > in case of a failure and they will also be used to automatically restore the > streaming job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3674) Add an interface for EventTime aware User Function
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan reassigned FLINK-3674: - Assignee: ramkrishna.s.vasudevan > Add an interface for EventTime aware User Function > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: ramkrishna.s.vasudevan > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)