Repository: kafka Updated Branches: refs/heads/trunk 6feaa8a58 -> 84a14fec2
KAFKA-4843: More efficient round-robin scheduler - Improves streams efficiency by more than 200K requests/second (small 100 byte requests) - Gets streams efficiency very close to pure consumer (see results in https://jenkins.confluent.io/job/system-test-kafka-branch-builder/746/console) - Maintains same fairness across tasks - Schedules all records in the queue in-between poll() calls, not just one per task. Author: Eno Thereska <[email protected]> Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang Closes #2643 from enothereska/minor-schedule-round-robin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84a14fec Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84a14fec Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84a14fec Branch: refs/heads/trunk Commit: 84a14fec299749a208251bce1a0eb9c1a8241d08 Parents: 6feaa8a Author: Eno Thereska <[email protected]> Authored: Wed Mar 29 15:00:26 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 29 15:00:26 2017 -0700 ---------------------------------------------------------------------- .../streams/processor/internals/StreamTask.java | 17 +- .../processor/internals/StreamThread.java | 225 ++++++++++++------- .../processor/internals/StreamTaskTest.java | 45 ++-- .../streams/streams_simple_benchmark_test.py | 4 +- tests/kafkatest/services/streams.py | 2 +- 5 files changed, 188 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7bd4be4..092d6e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -169,19 +169,26 @@ public class StreamTask extends AbstractTask implements Punctuator { } /** - * Process one record + * @return The number of records left in the buffer of this task's partition group + */ + public int numBuffered() { + return partitionGroup.numBuffered(); + } + + /** + * Process one record. * - * @return number of records left in the buffer of this task's partition group after the processing is done + * @return true if this method processes a record, false if it does not process a record. */ @SuppressWarnings("unchecked") - public int process() { + public boolean process() { // get the next record to process StampedRecord record = partitionGroup.nextRecord(recordInfo); // if there is no record to process, return immediately if (record == null) { requiresPoll = true; - return 0; + return false; } requiresPoll = false; @@ -224,7 +231,7 @@ public class StreamTask extends AbstractTask implements Punctuator { processorContext.setCurrentNode(null); } - return partitionGroup.numBuffered(); + return true; } private void updateProcessorContext(final ProcessorRecordContext recordContext, final ProcessorNode currNode) { http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 61d7d72..b90bde5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -223,6 +223,7 @@ public class StreamThread extends Thread { private final TaskCreator taskCreator = new TaskCreator(); final ConsumerRebalanceListener rebalanceListener; + private final static int UNLIMITED_RECORDS = -1; public synchronized boolean isInitialized() { return state == State.RUNNING; @@ -519,107 +520,168 @@ public class StreamThread extends Thread { return Math.max(this.timerStartedMs - previousTimeMs, 0); } - private void runLoop() { - int totalNumBuffered = 0; - boolean requiresPoll = true; - boolean polledRecords = false; - - consumer.subscribe(sourceTopicPattern, rebalanceListener); - - while (stillRunning()) { - this.timerStartedMs = time.milliseconds(); - - // try to fetch some records if necessary - if (requiresPoll) { - requiresPoll = false; - - boolean longPoll = totalNumBuffered == 0; + /** + * Get the next batch of records by polling. + * @return Next batch of records or null if no records available. + */ + private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) { + ConsumerRecords<byte[], byte[]> records = null; - ConsumerRecords<byte[], byte[]> records = null; + try { + records = consumer.poll(pollTimeMs); + } catch (NoOffsetForPartitionException ex) { + TopicPartition partition = ex.partition(); + if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { + log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic())); + consumer.seekToBeginning(ex.partitions()); + } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { + consumer.seekToEnd(ex.partitions()); + log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic())); + } else { - try { - records = consumer.poll(longPoll ? this.pollTimeMs : 0); - } catch (NoOffsetForPartitionException ex) { - TopicPartition partition = ex.partition(); - if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { - log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic())); - consumer.seekToBeginning(ex.partitions()); - } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { - consumer.seekToEnd(ex.partitions()); - log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic())); - } else { + if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { + setState(State.PENDING_SHUTDOWN); + String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + + " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + + "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; + throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex); + } - if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { - setState(State.PENDING_SHUTDOWN); - String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + - " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + - "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; - throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex); - } + if (originalReset.equals("earliest")) { + consumer.seekToBeginning(ex.partitions()); + } else if (originalReset.equals("latest")) { + consumer.seekToEnd(ex.partitions()); + } + log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset)); + } - if (originalReset.equals("earliest")) { - consumer.seekToBeginning(ex.partitions()); - } else if (originalReset.equals("latest")) { - consumer.seekToEnd(ex.partitions()); - } - log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset)); - } + } - } + if (rebalanceException != null) + throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); - if (rebalanceException != null) - throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); + return records; + } - if (records != null && !records.isEmpty()) { - int numAddedRecords = 0; + /** + * Take records and add them to each respective task + * @param records Records, can be null + */ + private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) { + if (records != null && !records.isEmpty()) { + int numAddedRecords = 0; - for (TopicPartition partition : records.partitions()) { - StreamTask task = activeTasksByPartition.get(partition); - numAddedRecords += task.addRecords(partition, records.records(partition)); - } - streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); - polledRecords = true; - } else { - polledRecords = false; - } + for (TopicPartition partition : records.partitions()) { + StreamTask task = activeTasksByPartition.get(partition); + numAddedRecords += task.addRecords(partition, records.records(partition)); + } + streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); + } + } - // only record poll latency is long poll is required - if (longPoll) { - streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); + /** + * Schedule the records processing by selecting which record is processed next. Commits may + * happen as records are processed. + * @tasks The tasks that have records. + * @param recordsProcessedBeforeCommit number of records to be processed before commit is called. + * if UNLIMITED_RECORDS, then commit is never called + * @return Number of records processed since last commit. + */ + private long processAndPunctuate(final Map<TaskId, StreamTask> tasks, + final long recordsProcessedBeforeCommit) { + + long totalProcessedEachRound; + long totalProcessedSinceLastMaybeCommit = 0; + // Round-robin scheduling by taking one record from each task repeatedly + // until no task has any records left + do { + totalProcessedEachRound = 0; + for (StreamTask task : tasks.values()) { + // we processed one record, + // and more are buffered waiting for the next round + if (task.process()) { + totalProcessedEachRound++; + totalProcessedSinceLastMaybeCommit++; } } + if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS && + totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) { + totalProcessedSinceLastMaybeCommit = 0; + long processLatency = computeLatency(); + streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessedSinceLastMaybeCommit, + timerStartedMs); + maybeCommit(this.timerStartedMs); + } + } while (totalProcessedEachRound != 0); - // try to process one fetch record from each task via the topology, and also trigger punctuate - // functions if necessary, which may result in more records going through the topology in this loop - if (totalNumBuffered > 0 || polledRecords) { - totalNumBuffered = 0; - - if (!activeTasks.isEmpty()) { - for (StreamTask task : activeTasks.values()) { + // go over the tasks again to punctuate or commit + for (StreamTask task : tasks.values()) { + maybePunctuate(task); + if (task.commitNeeded()) + commitOne(task); + } - totalNumBuffered += task.process(); + return totalProcessedSinceLastMaybeCommit; + } - requiresPoll = requiresPoll || task.requiresPoll(); + /** + * Adjust the number of records that should be processed by scheduler. This avoids + * scenarios where the processing time is higher than the commit time. + * @param prevRecordsProcessedBeforeCommit Previous number of records processed by scheduler. + * @param totalProcessed Total number of records processed in this last round. + * @param processLatency Total processing latency in ms processed in this last round. + * @param commitTime Desired commit time in ms. + * @return An adjusted number of records to be processed in the next round. + */ + private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedBeforeCommit, final long totalProcessed, + final long processLatency, final long commitTime) { + long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; + // check if process latency larger than commit latency + // note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so + // we will never process all records again. This might be an issue if the initial measurement + // was off due to a slow start. + if (processLatency > commitTime) { + // push down + recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); + log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", + logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); + } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) { + // push up + recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); + log.debug("{} processing latency {} > commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", + logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); + } - streamsMetrics.processTimeSensor.record(computeLatency(), timerStartedMs); + return recordsProcessedBeforeCommit; + } - maybePunctuate(task); + /** + * Main event loop for polling, and processing records through topologies. + */ + private void runLoop() { + long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; + consumer.subscribe(sourceTopicPattern, rebalanceListener); - if (task.commitNeeded()) - commitOne(task); - } + while (stillRunning()) { + this.timerStartedMs = time.milliseconds(); - } else { - // even when no task is assigned, we must poll to get a task. - requiresPoll = true; + // try to fetch some records if necessary + ConsumerRecords<byte[], byte[]> records = pollRequests(this.pollTimeMs); + if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) { + streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); + addRecordsToTasks(records); + final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit); + if (totalProcessed > 0) { + final long processLatency = computeLatency(); + streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed, + timerStartedMs); + recordsProcessedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed, + processLatency, commitTimeMs); } - - } else { - requiresPoll = true; } + maybeCommit(timerStartedMs); maybeUpdateStandbyTasks(); - maybeClean(timerStartedMs); } log.info("{} Shutting down at user request", logPrefix); @@ -692,8 +754,9 @@ public class StreamThread extends Thread { protected void maybeCommit(final long now) { if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { - log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed", - logPrefix, commitTimeMs, activeTasks, standbyTasks); + + log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed by {}ms", + logPrefix, activeTasks, standbyTasks, commitTimeMs, now - lastCommitMs); commitAll(); lastCommitMs = now; http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c1dce59..7c9f46b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -172,27 +172,33 @@ public class StreamTaskTest { new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); - assertEquals(5, task.process()); + assertTrue(task.process()); + assertEquals(5, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); - assertEquals(4, task.process()); + assertTrue(task.process()); + assertEquals(4, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(0, source2.numReceived); - assertEquals(3, task.process()); + assertTrue(task.process()); + assertEquals(3, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(1, source2.numReceived); - assertEquals(2, task.process()); + assertTrue(task.process()); + assertEquals(2, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(1, source2.numReceived); - assertEquals(1, task.process()); + assertTrue(task.process()); + assertEquals(1, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(2, source2.numReceived); - assertEquals(0, task.process()); + assertTrue(task.process()); + assertEquals(0, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(3, source2.numReceived); } @@ -234,7 +240,7 @@ public class StreamTaskTest { new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); - assertEquals(5, task.process()); + assertTrue(task.process()); assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); @@ -251,21 +257,21 @@ public class StreamTaskTest { assertTrue(consumer.paused().contains(partition1)); assertTrue(consumer.paused().contains(partition2)); - assertEquals(7, task.process()); + assertTrue(task.process()); assertEquals(2, source1.numReceived); assertEquals(0, source2.numReceived); assertEquals(1, consumer.paused().size()); assertTrue(consumer.paused().contains(partition2)); - assertEquals(6, task.process()); + assertTrue(task.process()); assertEquals(3, source1.numReceived); assertEquals(0, source2.numReceived); assertEquals(1, consumer.paused().size()); assertTrue(consumer.paused().contains(partition2)); - assertEquals(5, task.process()); + assertTrue(task.process()); assertEquals(3, source1.numReceived); assertEquals(1, source2.numReceived); @@ -289,40 +295,47 @@ public class StreamTaskTest { assertTrue(task.maybePunctuate()); - assertEquals(5, task.process()); + assertTrue(task.process()); + assertEquals(5, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); assertFalse(task.maybePunctuate()); - assertEquals(4, task.process()); + assertTrue(task.process()); + assertEquals(4, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(1, source2.numReceived); assertTrue(task.maybePunctuate()); - assertEquals(3, task.process()); + assertTrue(task.process()); + assertEquals(3, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(1, source2.numReceived); assertFalse(task.maybePunctuate()); - assertEquals(2, task.process()); + assertTrue(task.process()); + assertEquals(2, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(2, source2.numReceived); assertTrue(task.maybePunctuate()); - assertEquals(1, task.process()); + assertTrue(task.process()); + assertEquals(1, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(2, source2.numReceived); assertFalse(task.maybePunctuate()); - assertEquals(0, task.process()); + assertTrue(task.process()); + assertEquals(0, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(3, source2.numReceived); + assertFalse(task.process()); assertFalse(task.maybePunctuate()); processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L); http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index c9f970e..9f0e457 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -30,12 +30,12 @@ class StreamsSimpleBenchmarkTest(Test): def __init__(self, test_context): super(StreamsSimpleBenchmarkTest, self).__init__(test_context) - self.num_records = 10000000L + self.num_records = 20000000L self.replication = 1 @cluster(num_nodes=9) - @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 2, 3]) + @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3]) def test_simple_benchmark(self, test, scale): """ Run simple Kafka Streams benchmark http://git-wip-us.apache.org/repos/asf/kafka/blob/84a14fec/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 4f8f1a3..e7be947 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): self.logger.info("Restarting Kafka Streams on " + str(node.account)) self.start_node(node) - def wait(self, timeout_sec=720): + def wait(self, timeout_sec=1440): for node in self.nodes: self.wait_node(node, timeout_sec)
