Repository: kafka Updated Branches: refs/heads/trunk 404b696be -> 3fd9be49a
MINOR: Remove synchronized as the tasks are executed sequentially Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1441 from Ishiihara/remove-synchronized Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3fd9be49 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3fd9be49 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3fd9be49 Branch: refs/heads/trunk Commit: 3fd9be49ac35adaca401f58552b3ffa68f8d4eaa Parents: 404b696 Author: Liquan Pei <[email protected]> Authored: Mon May 30 17:11:17 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon May 30 17:11:17 2016 -0700 ---------------------------------------------------------------------- .../connect/storage/MemoryOffsetBackingStore.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3fd9be49/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index e319393..baa8192 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -83,10 +83,8 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { @Override public Map<ByteBuffer, ByteBuffer> call() throws Exception { Map<ByteBuffer, ByteBuffer> result = new HashMap<>(); - synchronized (MemoryOffsetBackingStore.this) { - for (ByteBuffer key : keys) { - result.put(key, data.get(key)); - } + for (ByteBuffer key : keys) { + result.put(key, data.get(key)); } if (callback != null) callback.onCompletion(null, result); @@ -102,12 +100,10 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { return executor.submit(new Callable<Void>() { @Override public Void call() throws Exception { - synchronized (MemoryOffsetBackingStore.this) { - for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) { - data.put(entry.getKey(), entry.getValue()); - } - save(); + for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) { + data.put(entry.getKey(), entry.getValue()); } + save(); if (callback != null) callback.onCompletion(null, null); return null;
