Repository: kafka Updated Branches: refs/heads/0.11.0 fc3eeb004 -> d93bd1aff
KAFKA-5756; Connect WorkerSourceTask synchronization issue on flush Author: oleg <o...@nexla.com> Reviewers: Randall Hauch <rha...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #3702 from oleg-smith/KAFKA-5756 (cherry picked from commit 51025764601760684f3dbdc0171fd7aa2ebe70e7) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d93bd1af Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d93bd1af Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d93bd1af Branch: refs/heads/0.11.0 Commit: d93bd1aff96b418c22cdbf9af355c6185d1e9192 Parents: fc3eeb0 Author: oleg <o...@nexla.com> Authored: Wed Sep 6 11:12:23 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Sep 6 11:25:26 2017 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerSourceTask.java | 2 +- .../connect/storage/OffsetStorageWriter.java | 65 +++++++++++--------- 2 files changed, 36 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d93bd1af/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 5627145..163c7d0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -159,7 +159,7 @@ class WorkerSourceTask extends WorkerTask { } if (toSend == null) { - log.debug("Nothing to send to Kafka. Polling source for additional records"); + log.trace("Nothing to send to Kafka. Polling source for additional records"); toSend = task.poll(); } if (toSend == null) http://git-wip-us.apache.org/repos/asf/kafka/blob/d93bd1af/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java index c5d1467..3239b67 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -59,7 +59,7 @@ import java.util.concurrent.Future; * time. * </p> * <p> - * This class is not thread-safe. It should only be accessed from a Task's processing thread. + * This class is thread-safe. * </p> */ public class OffsetStorageWriter { @@ -72,7 +72,6 @@ public class OffsetStorageWriter { // Offset data in Connect format private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>(); - // Not synchronized, should only be accessed by flush thread private Map<Map<String, Object>, Map<String, Object>> toFlush = null; // Unique ID for each flush request to handle callbacks after timeouts private long currentFlushId = 0; @@ -129,44 +128,50 @@ public class OffsetStorageWriter { * @return a Future, or null if there are no offsets to commitOffsets */ public Future<Void> doFlush(final Callback<Void> callback) { - final long flushId = currentFlushId; + final long flushId; // Serialize - Map<ByteBuffer, ByteBuffer> offsetsSerialized; - try { - offsetsSerialized = new HashMap<>(); - for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) { - // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate - // for that data. The only enforcement of the format is here. - OffsetUtils.validateFormat(entry.getKey()); - OffsetUtils.validateFormat(entry.getValue()); - // When serializing the key, we add in the namespace information so the key is [namespace, real key] - byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey())); - ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; - byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue()); - ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; - offsetsSerialized.put(keyBuffer, valueBuffer); + final Map<ByteBuffer, ByteBuffer> offsetsSerialized; + + synchronized (this) { + flushId = currentFlushId; + + try { + offsetsSerialized = new HashMap<>(toFlush.size()); + for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) { + // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate + // for that data. The only enforcement of the format is here. + OffsetUtils.validateFormat(entry.getKey()); + OffsetUtils.validateFormat(entry.getValue()); + // When serializing the key, we add in the namespace information so the key is [namespace, real key] + byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey())); + ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; + byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue()); + ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; + offsetsSerialized.put(keyBuffer, valueBuffer); + } + } catch (Throwable t) { + // Must handle errors properly here or the writer will be left mid-flush forever and be + // unable to make progress. + log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit " + + "offsets under namespace {}. This likely won't recover unless the " + + "unserializable partition or offset information is overwritten.", namespace); + log.error("Cause of serialization failure:", t); + callback.onCompletion(t, null); + return null; } - } catch (Throwable t) { - // Must handle errors properly here or the writer will be left mid-flush forever and be - // unable to make progress. - log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit " - + "offsets under namespace {}. This likely won't recover unless the " - + "unserializable partition or offset information is overwritten.", namespace); - log.error("Cause of serialization failure:", t); - callback.onCompletion(t, null); - return null; + + // And submit the data + log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), toFlush); } - // And submit the data - log.debug("Submitting {} entries to backing store", offsetsSerialized.size()); - log.debug("The offsets are: " + toFlush.toString()); return backingStore.set(offsetsSerialized, new Callback<Void>() { @Override public void onCompletion(Throwable error, Void result) { boolean isCurrent = handleFinishWrite(flushId, error, result); - if (isCurrent && callback != null) + if (isCurrent && callback != null) { callback.onCompletion(error, result); + } } }); }