[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112770#comment-16112770 ] ASF GitHub Bot commented on FLINK-5486: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 ``` Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 91.985 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase testExternalizedFullRocksDBCheckpointsStandalone(org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase) Time elapsed: 29.976 sec <<< ERROR! java.io.IOException: java.lang.Exception: Failed to complete checkpoint at org.apache.flink.runtime.testingUtils.TestingCluster.requestCheckpoint(TestingCluster.scala:399) at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedCheckpoints(ExternalizedCheckpointITCase.java:218) at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedFullRocksDBCheckpointsStandalone(ExternalizedCheckpointITCase.java:78) Caused by: java.lang.Exception: Failed to complete checkpoint at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:375) at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:358) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) at akka.dispatch.OnComplete.internal(Future.scala:247) at akka.dispatch.OnComplete.internal(Future.scala:245) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ``` > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16131628#comment-16131628 ] ASF GitHub Bot commented on FLINK-5486: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4356 Run QA again ? > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16131631#comment-16131631 ] ASF GitHub Bot commented on FLINK-5486: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 Okay. Ted. I will trigger the QA soon. > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134044#comment-16134044 ] ASF GitHub Bot commented on FLINK-5486: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 triggering now > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649306#comment-16649306 ] ASF GitHub Bot commented on FLINK-5486: --- GJL closed pull request #4356: [FLINK-5486] Fix lacking of synchronization in BucketingSink#handleRe… URL: https://github.com/apache/flink/pull/4356 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 55400c6ce19..bd5058a4fdf 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -719,8 +719,9 @@ private void handleRestoredBucketState(State restoredState) { bucketState.isWriterOpen = false; handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); - - bucketState.pendingFilesPerCheckpoint.clear(); + synchronized (bucketState.pendingFilesPerCheckpoint) { + bucketState.pendingFilesPerCheckpoint.clear(); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649305#comment-16649305 ] ASF GitHub Bot commented on FLINK-5486: --- GJL commented on issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingSink#handleRe… URL: https://github.com/apache/flink/pull/4356#issuecomment-429608448 I am closing this PR because [FLINK-5486](https://issues.apache.org/jira/browse/FLINK-5486?focusedCommentId=16649304&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16649304) got closed. Please reopen the ticket/PR if you disagree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Major > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503691#comment-16503691 ] Gary Yao commented on FLINK-5486: - I don't see why this is needed. Flink callbacks are generally not invoked concurrently. > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > Fix For: 1.3.4 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351986#comment-16351986 ] Ted Yu commented on FLINK-5486: --- Can this get more review, please ? > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199255#comment-16199255 ] ASF GitHub Bot commented on FLINK-5486: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4356 retest this please > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199652#comment-16199652 ] ASF GitHub Bot commented on FLINK-5486: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 OKay. Thanks , Ted. Will retest again soon enough./ > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210488#comment-16210488 ] ASF GitHub Bot commented on FLINK-5486: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 @tedyu I am retesting now..let us wait what will happen. > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210974#comment-16210974 ] ASF GitHub Bot commented on FLINK-5486: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/4356 Thanks for your contribution. I have a few questions because the submitted code does not match with what the ticket currently says: From https://issues.apache.org/jira/browse/FLINK-5486: >The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside the synchronization block. Otherwise during the processing of handlePendingFilesForPreviousCheckpoints(), some entries of the map may be cleared. I am confused because the submitted code encloses `bucketState.pendingFilesPerCheckpoint.clear();` in a synchronized block. @tedyu Did you run into actual production issues, or did you find the problem while reviewing the code? Can you elaborate why the additional synchronized block is needed? > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211139#comment-16211139 ] Ted Yu commented on FLINK-5486: --- The reason for additional synchronization is in description. This was discovered during code review. > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091214#comment-16091214 ] ASF GitHub Bot commented on FLINK-5486: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4356 [FLINK-5486] Fix lacking of synchronization in BucketingSink#handleRe… You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-5486 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4356.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4356 commit 7399ce71c47c242566d5f6959fec2ee666929b89 Author: zhangminglei Date: 2017-07-18T07:07:09Z [FLINK-5486] Fix lacking of synchronization in BucketingSink#handleRestoredBucketState > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099308#comment-16099308 ] ASF GitHub Bot commented on FLINK-5486: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4356 lgtm > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15822346#comment-15822346 ] Ted Yu commented on FLINK-5486: --- Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.3.4#6332)