Repository: kafka Updated Branches: refs/heads/0.9.0 fe855f982 -> 537aeae33
KAFKA-2859: Fix deadlock in WorkerSourceTask. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes #554 from ewencp/kafka-2859-deadlock-worker-source-task (cherry picked from commit 0f00ec97ae328d04c29cb1cb3eabad3f17e31582) Signed-off-by: Gwen Shapira <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/537aeae3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/537aeae3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/537aeae3 Branch: refs/heads/0.9.0 Commit: 537aeae33403e96d1736f049415ec42ea2f7f8d1 Parents: fe855f9 Author: Ewen Cheslack-Postava <[email protected]> Authored: Wed Nov 18 14:19:37 2015 -0800 Committer: Gwen Shapira <[email protected]> Committed: Wed Nov 18 14:19:51 2015 -0800 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerSourceTask.java | 112 +++++++++++------ .../connect/runtime/WorkerSourceTaskTest.java | 124 +++++++++++++++---- 2 files changed, 176 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/537aeae3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 6cf1dd7..0176dbc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -17,11 +17,14 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; @@ -35,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -46,6 +50,8 @@ import java.util.concurrent.TimeoutException; class WorkerSourceTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); + private static final long SEND_FAILED_BACKOFF_MS = 100; + private final ConnectorTaskId id; private final SourceTask task; private final Converter keyConverter; @@ -57,12 +63,15 @@ class WorkerSourceTask implements WorkerTask { private final WorkerConfig workerConfig; private final Time time; + private List<SourceRecord> toSend; + private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because // there is no IdentityHashSet. private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages; // A second buffer is used while an offset flush is running private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog; private boolean flushing; + private CountDownLatch stopRequestedLatch; public WorkerSourceTask(ConnectorTaskId id, SourceTask task, Converter keyConverter, Converter valueConverter, @@ -79,9 +88,12 @@ class WorkerSourceTask implements WorkerTask { this.workerConfig = workerConfig; this.time = time; + this.toSend = null; + this.lastSendFailed = false; this.outstandingMessages = new IdentityHashMap<>(); this.outstandingMessagesBacklog = new IdentityHashMap<>(); this.flushing = false; + this.stopRequestedLatch = new CountDownLatch(1); } @Override @@ -92,8 +104,10 @@ class WorkerSourceTask implements WorkerTask { @Override public void stop() { - if (workThread != null) + if (workThread != null) { workThread.startGracefulShutdown(); + stopRequestedLatch.countDown(); + } } @Override @@ -117,47 +131,69 @@ class WorkerSourceTask implements WorkerTask { } /** - * Send a batch of records. This is atomic up to the point of getting the messages into the - * Producer and recorded in our set of outstanding messages, so either all or none will be sent - * @param records + * Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can + * be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException. + * @return true if all messages were sent, false if some need to be retried */ - private synchronized void sendRecords(List<SourceRecord> records) { - for (final SourceRecord record : records) { + private boolean sendRecords() { + int processed = 0; + for (final SourceRecord record : toSend) { byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value); log.trace("Appending record with key {}, value {}", record.key(), record.value()); - if (!flushing) { - outstandingMessages.put(producerRecord, producerRecord); - } else { - outstandingMessagesBacklog.put(producerRecord, producerRecord); + // We need this queued first since the callback could happen immediately (even synchronously in some cases). + // Because of this we need to be careful about handling retries -- we always save the previously attempted + // record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding + // messages and update the offsets. + synchronized (this) { + if (!lastSendFailed) { + if (!flushing) { + outstandingMessages.put(producerRecord, producerRecord); + } else { + outstandingMessagesBacklog.put(producerRecord, producerRecord); + } + // Offsets are converted & serialized in the OffsetWriter + offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); + } } - producer.send( - producerRecord, - new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - // Given the default settings for zero data loss, this should basically never happen -- - // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request - // timeouts, callbacks with exceptions should never be invoked in practice. If the - // user overrode these settings, the best we can do is notify them of the failure via - // logging. - log.error("{} failed to send record to {}: {}", id, record.topic(), e); - log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}", - record.topic(), record.kafkaPartition(), record.key(), record.value(), - record.sourceOffset(), record.sourcePartition()); - } else { - log.trace("Wrote record successfully: topic {} partition {} offset {}", - recordMetadata.topic(), recordMetadata.partition(), - recordMetadata.offset()); + try { + producer.send( + producerRecord, + new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + // Given the default settings for zero data loss, this should basically never happen -- + // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request + // timeouts, callbacks with exceptions should never be invoked in practice. If the + // user overrode these settings, the best we can do is notify them of the failure via + // logging. + log.error("{} failed to send record to {}: {}", id, record.topic(), e); + log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}", + record.topic(), record.kafkaPartition(), record.key(), record.value(), + record.sourceOffset(), record.sourcePartition()); + } else { + log.trace("Wrote record successfully: topic {} partition {} offset {}", + recordMetadata.topic(), recordMetadata.partition(), + recordMetadata.offset()); + } + recordSent(producerRecord); } - recordSent(producerRecord); - } - }); - // Offsets are converted & serialized in the OffsetWriter - offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); + }); + lastSendFailed = false; + } catch (RetriableException e) { + log.warn("Failed to send {}, backing off before retrying:", producerRecord, e); + toSend = toSend.subList(processed, toSend.size()); + lastSendFailed = true; + return false; + } catch (KafkaException e) { + throw new ConnectException("Unrecoverable exception trying to send", e); + } + processed++; } + toSend = null; + return true; } private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) { @@ -307,10 +343,12 @@ class WorkerSourceTask implements WorkerTask { } while (getRunning()) { - List<SourceRecord> records = task.poll(); - if (records == null) + if (toSend == null) + toSend = task.poll(); + if (toSend == null) continue; - sendRecords(records); + if (!sendRecords()) + stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { // Ignore and allow to exit. http://git-wip-us.apache.org/repos/asf/kafka/blob/537aeae3/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 9a382b6..7380f1c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -198,11 +198,12 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Can just use the same record for key and value records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); - Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(); + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); PowerMock.replayAll(); - Whitebox.invokeMethod(workerTask, "sendRecords", records); + Whitebox.setInternalState(workerTask, "toSend", records); + Whitebox.invokeMethod(workerTask, "sendRecords"); assertEquals(SERIALIZED_KEY, sent.getValue().key()); assertEquals(SERIALIZED_RECORD, sent.getValue().value()); @@ -210,6 +211,40 @@ public class WorkerSourceTaskTest extends ThreadedTest { } @Test + public void testSendRecordsRetries() throws Exception { + createWorkerTask(); + + // Differentiate only by Kafka partition so we can reuse conversion expectations + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + // First round + expectSendRecordOnce(false); + // Any Producer retriable exception should work here + expectSendRecordSyncFailure(new org.apache.kafka.common.errors.TimeoutException("retriable sync failure")); + + // Second round + expectSendRecordOnce(true); + expectSendRecordOnce(false); + + PowerMock.replayAll(); + + // Try to send 3, make first pass, second fail. Should save last two + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertEquals(Arrays.asList(record2, record3), Whitebox.getInternalState(workerTask, "toSend")); + + // Next they all succeed + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + assertNull(Whitebox.getInternalState(workerTask, "toSend")); + + PowerMock.verifyAll(); + } + + @Test public void testSlowTaskStart() throws Exception { createWorkerTask(); @@ -252,38 +287,81 @@ public class WorkerSourceTaskTest extends ThreadedTest { } }); // Fallout of the poll() call - expectSendRecord(); + expectSendRecordAnyTimes(); return latch; } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws InterruptedException { - EasyMock.expect(keyConverter.fromConnectData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY); - EasyMock.expect(valueConverter.fromConnectData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD); + private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException { + expectConvertKeyValue(false); + + offsetWriter.offset(PARTITION, OFFSET); + PowerMock.expectLastCall(); - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work EasyMock.expect( + producer.send(EasyMock.anyObject(ProducerRecord.class), + EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) + .andThrow(error); + } + + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException { + return expectSendRecord(true, false); + } + + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean isRetry) throws InterruptedException { + return expectSendRecord(false, isRetry); + } + + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry) throws InterruptedException { + expectConvertKeyValue(anyTimes); + + Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); + + // 1. Offset data is passed to the offset storage. + if (!isRetry) { + offsetWriter.offset(PARTITION, OFFSET); + if (anyTimes) + PowerMock.expectLastCall().anyTimes(); + else + PowerMock.expectLastCall(); + } + + // 2. Converted data passed to the producer, which will need callbacks invoked for flush to work + IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( producer.send(EasyMock.capture(sent), - EasyMock.capture(producerCallbacks))) - .andStubAnswer(new IAnswer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer() throws Throwable { - synchronized (producerCallbacks) { - for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null); - } - producerCallbacks.reset(); - } - return sendFuture; + EasyMock.capture(producerCallbacks))); + IAnswer<Future<RecordMetadata>> expectResponse = new IAnswer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer() throws Throwable { + synchronized (producerCallbacks) { + for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null); } - }); - // 2. Offset data is passed to the offset storage. - offsetWriter.offset(PARTITION, OFFSET); - PowerMock.expectLastCall().anyTimes(); + producerCallbacks.reset(); + } + return sendFuture; + } + }; + if (anyTimes) + expect.andStubAnswer(expectResponse); + else + expect.andAnswer(expectResponse); return sent; } + private void expectConvertKeyValue(boolean anyTimes) { + IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(TOPIC, KEY_SCHEMA, KEY)); + if (anyTimes) + convertKeyExpect.andStubReturn(SERIALIZED_KEY); + else + convertKeyExpect.andReturn(SERIALIZED_KEY); + IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(TOPIC, RECORD_SCHEMA, RECORD)); + if (anyTimes) + convertValueExpect.andStubReturn(SERIALIZED_RECORD); + else + convertValueExpect.andReturn(SERIALIZED_RECORD); + } + private void awaitPolls(CountDownLatch latch) throws InterruptedException { latch.await(1000, TimeUnit.MILLISECONDS); }
