[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ted Yu updated FLINK-4534: -------------------------- Description: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState<T> bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} was: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState<T> bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} > Lack of synchronization in BucketingSink#restoreState() > ------------------------------------------------------- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Reporter: Ted Yu > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState<T> bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry<String, BucketState<T>> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set<Long> pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)