[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16304163#comment-16304163 ]
Ted Yu edited comment on FLINK-4534 at 1/26/18 4:27 AM: -------------------------------------------------------- Can this get more review ? Thanks was (Author: yuzhih...@gmail.com): lgtm > Lack of synchronization in BucketingSink#restoreState() > ------------------------------------------------------- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Reporter: Ted Yu > Assignee: mingleizhang > Priority: Major > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState<T> bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry<String, BucketState<T>> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set<Long> pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)