[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112770#comment-16112770
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
```
Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 91.985 sec 
<<< FAILURE! - in 
org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase

testExternalizedFullRocksDBCheckpointsStandalone(org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase)
  Time elapsed: 29.976 sec  <<< ERROR!
java.io.IOException: java.lang.Exception: Failed to complete checkpoint
at 
org.apache.flink.runtime.testingUtils.TestingCluster.requestCheckpoint(TestingCluster.scala:399)
at 
org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedCheckpoints(ExternalizedCheckpointITCase.java:218)
at 
org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedFullRocksDBCheckpointsStandalone(ExternalizedCheckpointITCase.java:78)
Caused by: java.lang.Exception: Failed to complete checkpoint
at 
org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:375)
at 
org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:358)
at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
at akka.dispatch.OnComplete.internal(Future.scala:247)
at akka.dispatch.OnComplete.internal(Future.scala:245)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
```


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16131628#comment-16131628
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4356
  
Run QA again ?


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16131631#comment-16131631
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
Okay. Ted. I will trigger the QA soon.


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134044#comment-16134044
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
triggering now


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-10-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649306#comment-16649306
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

GJL closed pull request #4356: [FLINK-5486] Fix lacking of synchronization in 
BucketingSink#handleRe…
URL: https://github.com/apache/flink/pull/4356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 55400c6ce19..bd5058a4fdf 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -719,8 +719,9 @@ private void handleRestoredBucketState(State 
restoredState) {
bucketState.isWriterOpen = false;
 

handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
-
-   bucketState.pendingFilesPerCheckpoint.clear();
+   synchronized (bucketState.pendingFilesPerCheckpoint) {
+   bucketState.pendingFilesPerCheckpoint.clear();
+   }
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-10-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649305#comment-16649305
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

GJL commented on issue #4356: [FLINK-5486] Fix lacking of synchronization in 
BucketingSink#handleRe…
URL: https://github.com/apache/flink/pull/4356#issuecomment-429608448
 
 
   I am closing this PR because 
[FLINK-5486](https://issues.apache.org/jira/browse/FLINK-5486?focusedCommentId=16649304&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16649304)
 got closed. Please reopen the ticket/PR if you disagree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Ted Yu
>Assignee: zhangminglei
>Priority: Major
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-06-06 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503691#comment-16503691
 ] 

Gary Yao commented on FLINK-5486:
-

I don't see why this is needed. Flink callbacks are generally not invoked 
concurrently.

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.3.4
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-02-04 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351986#comment-16351986
 ] 

Ted Yu commented on FLINK-5486:
---

Can this get more review, please ?

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199255#comment-16199255
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4356
  
retest this please


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199652#comment-16199652
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
OKay. Thanks , Ted. Will retest again soon enough./


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210488#comment-16210488
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
@tedyu I am retesting now..let us wait what will happen.


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210974#comment-16210974
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4356
  
Thanks for your contribution. I have a few questions because the submitted 
code does not match with what the ticket currently says:

From https://issues.apache.org/jira/browse/FLINK-5486:
>The handlePendingFilesForPreviousCheckpoints() call should be enclosed 
inside the synchronization block. Otherwise during the processing of 
handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
cleared.

I am confused because the submitted code encloses 
`bucketState.pendingFilesPerCheckpoint.clear();` in a synchronized block.

@tedyu Did you run into actual production issues, or did you find the 
problem while reviewing the code? Can you elaborate why the additional 
synchronized block is needed? 


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-10-19 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211139#comment-16211139
 ] 

Ted Yu commented on FLINK-5486:
---

The reason for additional synchronization is in description.

This was discovered during code review.

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-07-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091214#comment-16091214
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4356

[FLINK-5486] Fix lacking of synchronization in BucketingSink#handleRe…



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-5486

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4356.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4356


commit 7399ce71c47c242566d5f6959fec2ee666929b89
Author: zhangminglei 
Date:   2017-07-18T07:07:09Z

[FLINK-5486] Fix lacking of synchronization in 
BucketingSink#handleRestoredBucketState




> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099308#comment-16099308
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4356
  
lgtm


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-01-13 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15822346#comment-15822346
 ] 

Ted Yu commented on FLINK-5486:
---

Lock on State.bucketStates should be held in the following method:
{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)