Repository: kafka Updated Branches: refs/heads/trunk 80e0af50d -> bd0146d98
MINOR: various random minor fixes and improve KafkaConsumer JavaDocs Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Damian Guy <damian....@gmail.com>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3884 from mjsax/minor-fixed-discoverd-via-exception-handling-investigation Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd0146d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd0146d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd0146d9 Branch: refs/heads/trunk Commit: bd0146d984dd5df82fb19aa936e8f4ff9ca40030 Parents: 80e0af5 Author: Matthias J. Sax <matth...@confluent.io> Authored: Wed Sep 20 07:13:03 2017 +0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Sep 20 07:13:03 2017 +0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 104 ++++++++++------ .../producer/BufferExhaustedException.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 15 ++- .../apache/kafka/clients/producer/Producer.java | 33 +++--- .../kafka/common/metrics/KafkaMetric.java | 1 - .../main/scala/kafka/tools/StreamsResetter.java | 2 - .../org/apache/kafka/streams/KafkaStreams.java | 118 ++++++++++--------- .../org/apache/kafka/streams/StreamsConfig.java | 8 +- .../internals/AbstractProcessorContext.java | 15 +-- .../processor/internals/AbstractTask.java | 1 + .../processor/internals/AssignedTasks.java | 8 +- .../internals/InternalTopologyBuilder.java | 2 +- .../processor/internals/RecordQueue.java | 28 ++--- .../internals/SourceNodeRecordDeserializer.java | 4 +- .../processor/internals/StandbyTask.java | 37 +++--- .../internals/StoreChangelogReader.java | 9 +- .../streams/processor/internals/StreamTask.java | 21 ++-- .../processor/internals/StreamThread.java | 26 ++-- .../processor/internals/TaskManager.java | 20 +--- 19 files changed, 251 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 76e4073..c2f2f5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -112,7 +112,7 @@ import java.util.regex.Pattern; * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and * processing records. These processes can either be running on the same machine or they can be * distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances - * sharing the same <code>group.id</code> will be part of the same consumer group. + * sharing the same {@code group.id} will be part of the same consumer group. * <p> * Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the * {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the @@ -152,12 +152,12 @@ import java.util.regex.Pattern; * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for - * a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will + * a duration of {@code session.timeout.ms}, then the consumer will be considered dead and its partitions will * be reassigned. * <p> * It is also possible that the consumer could encounter a "livelock" situation where it is continuing * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions - * indefinitely in this case, we provide a liveness detection mechanism using the <code>max.poll.interval.ms</code> + * indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms} * setting. Basically if you don't call poll at least as frequently as the configured max interval, * then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, * you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}). @@ -211,15 +211,15 @@ import java.util.regex.Pattern; * </pre> * * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the - * configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the + * configuration {@code >bootstrap.servers}. This list is just used to discover the rest of the brokers in the * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in * case there are servers down when the client is connecting). * <p> - * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by - * the config <code>auto.commit.interval.ms</code>. + * Setting {@code enable.auto.commit} means that offsets are committed automatically with a frequency controlled by + * the config {@code auto.commit.interval.ms}. * <p> * In this example the consumer is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers - * called <i>test</i> as configured with <code>group.id</code>. + * called <i>test</i> as configured with {@code group.id}. * <p> * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we * are saying that our record's key and value will just be simple strings. @@ -423,8 +423,7 @@ import java.util.regex.Pattern; * <p> * Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. * In order for this to work, consumers reading from these partitions should be configured to only read committed data. - * This can be achieved by by setting the <code>isolation.level=read_committed</code> in the consumer's configuration. - * </p> + * This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration. * * <p> * In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been @@ -433,17 +432,19 @@ import java.util.regex.Pattern; * consumer would be the offset of the first message in the partition belonging to an open transaction. This offset * is known as the 'Last Stable Offset'(LSO).</p> * - * <p>A </p><code>read_committed</code> consumer will only read up till the LSO and filter out any transactional + * <p> + * A {@code read_committed} consumer will only read up to the LSO and filter out any transactional * messages which have been aborted. The LSO also affects the behavior of {@link #seekToEnd(Collection)} and - * {@link #endOffsets(Collection)} for <code>read_committed</code> consumers, details of which are in each method's documentation. - * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for <code>read_committed</code> consumers.</p> + * {@link #endOffsets(Collection)} for {@code read_committed} consumers, details of which are in each method's documentation. + * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for {@code read_committed} consumers. * - * <p>Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. + * <p> + * Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. * There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from * topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction * markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using - * <code>read_committed</code> consumers may also see gaps due to aborted transactions, since those messages would not - * be returned by the consumer and yet would have valid offsets.</p> + * {@code read_committed} consumers may also see gaps due to aborted transactions, since those messages would not + * be returned by the consumer and yet would have valid offsets. * * <h3><a name="multithreaded">Multi-threaded Processing</a></h3> * @@ -869,7 +870,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param topics The list of topics to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics - * @throws IllegalArgumentException If topics is null or contains null or empty elements + * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null + * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { @@ -911,6 +914,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param topics The list of topics to subscribe to * @throws IllegalArgumentException If topics is null or contains null or empty elements + * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Collection<String> topics) { @@ -918,9 +923,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics - * existing at the time of check. - * + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topic existing at the time of check. * <p> * As part of group management, the consumer will keep track of the list of consumers that * belong to a particular group and will trigger a rebalance operation if one of the @@ -935,7 +939,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param pattern Pattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics - * @throws IllegalArgumentException If pattern is null + * @throws IllegalArgumentException If pattern or listener is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { @@ -956,7 +962,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. - * * <p> * This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer @@ -966,6 +971,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param pattern Pattern to subscribe to * @throws IllegalArgumentException If pattern is null + * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called + * previously (without a subsequent call to {@link #unsubscribe()}) */ @Override public void subscribe(Pattern pattern) { @@ -973,8 +980,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This - * also clears any partitions directly assigned through {@link #assign(Collection)}. + * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. + * This also clears any partitions directly assigned through {@link #assign(Collection)}. */ public void unsubscribe() { acquireAndEnsureOpen(); @@ -991,17 +998,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment * and will replace the previous assignment (if there is one). - * + * <p> * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}. - * * <p> * Manual topic assignment through this method does not use the consumer's group management * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)} * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}. + * <p> + * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new + * assignment replaces the old one. * * @param partitions The list of partitions to assign this consumer * @throws IllegalArgumentException If partitions is null or contains null or empty topics + * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern + * (without a subsequent call to {@link #unsubscribe()}) */ @Override public void assign(Collection<TopicPartition> partitions) { @@ -1289,6 +1300,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets + * + * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer + * or if provided offset is negative */ @Override public void seek(TopicPartition partition, long offset) { @@ -1307,11 +1321,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called. - * If no partition is provided, seek to the first offset for all of the currently assigned partitions. + * If no partitions are provided, seek to the first offset for all of the currently assigned partitions. + * + * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToBeginning(Collection<TopicPartition> partitions) { acquireAndEnsureOpen(); try { + if (partitions == null) { + throw new IllegalArgumentException("Partitions collection cannot be null"); + } Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to beginning of partition {}", tp); @@ -1325,14 +1344,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called. - * If no partition is provided, seek to the final offset for all of the currently assigned partitions. - * - * If <code>isolation.level=read_committed</code>, the end offset will be the Last Stable Offset, ie. the offset + * If no partitions are provided, seek to the final offset for all of the currently assigned partitions. + * <p> + * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset * of the first message with an open transaction. + * + * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToEnd(Collection<TopicPartition> partitions) { acquireAndEnsureOpen(); try { + if (partitions == null) { + throw new IllegalArgumentException("Partitions collection cannot be null"); + } Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to end of partition {}", tp); @@ -1348,6 +1372,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param partition The partition to get the position for * @return The offset + * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this @@ -1471,6 +1496,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * Note that this method does not affect partition subscription. In particular, it does not cause a group * rebalance when automatic assignment is used. * @param partitions The partitions which should be paused + * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer */ @Override public void pause(Collection<TopicPartition> partitions) { @@ -1490,6 +1516,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * {@link #poll(long)} will return records from these partitions if there are any to be fetched. * If the partitions were not previously paused, this method is a no-op. * @param partitions The partitions which should be resumed + * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer */ @Override public void resume(Collection<TopicPartition> partitions) { @@ -1534,6 +1561,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the configured request timeout * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up * the offsets by timestamp. */ @@ -1564,6 +1593,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param partitions the partitions to get the earliest offsets. * @return The earliest available offsets for the given partitions + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the configured request timeout */ @Override public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { @@ -1581,16 +1612,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * Notice that this method may block indefinitely if the partition does not exist. * This method does not change the current consumer position of the partitions. - * </p> - * - * <p>When <code>isolation.level=read_committed</code> the last offset will be the Last Stable Offset (LSO). + * <p> + * When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO). * This is the offset of the first message with an open transaction. The LSO moves forward as transactions - * are completed.</p> + * are completed. * * @see #seekToEnd(Collection) * * @param partitions the partitions to get the end offsets. * @return The end offsets for the given partitions. + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the configured request timeout */ @Override public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { @@ -1619,7 +1651,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * Tries to close the consumer cleanly within the specified timeout. This method waits up to - * <code>timeout</code> for the consumer to complete pending commits and leave the group. + * {@code timeout} for the consumer to complete pending commits and leave the group. * If auto-commit is enabled, this will commit the current offsets if possible within the * timeout. If the consumer is unable to complete offset commits and gracefully leave the group * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be @@ -1627,9 +1659,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. - * @param timeUnit The time unit for the <code>timeout</code> + * @param timeUnit The time unit for the {@code timeout} * @throws InterruptException If the thread is interrupted before or while this function is called - * @throws IllegalArgumentException If the <code>timeout</code> is negative. + * @throws IllegalArgumentException If the {@code timeout} is negative. */ public void close(long timeout, TimeUnit timeUnit) { if (timeout < 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java index 929b6b9..be840db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.KafkaException; /** * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at - * which data can be sent for long enough for the alloted buffer to be exhausted. + * which data can be sent for long enough for the allocated buffer to be exhausted. */ public class BufferExhaustedException extends KafkaException { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7d51640..7dcec5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -20,7 +20,9 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetrics; import org.apache.kafka.clients.producer.internals.RecordAccumulator; @@ -564,12 +566,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } /** - * Sends a list of consumed offsets to the consumer group coordinator, and also marks + * Sends a list of specified offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. These offsets will be considered - * consumed only if the transaction is committed successfully. - * + * committed only if the transaction is committed successfully. The committed offset should + * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1. + * <p> * This method should be used when you need to batch consumed and produced messages - * together, typically in a consume-transform-produce pattern. + * together, typically in a consume-transform-produce pattern. Thus, the specified + * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used + * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false} + * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or + * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits). * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 1e77633..4982033 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -63,43 +63,38 @@ public interface Producer<K, V> extends Closeable { void abortTransaction() throws ProducerFencedException; /** - * Send the given record asynchronously and return a future which will eventually contain the response information. - * - * @param record The record to send - * @return A future which will eventually contain the response information + * See {@link KafkaProducer#send(ProducerRecord)} */ - public Future<RecordMetadata> send(ProducerRecord<K, V> record); + Future<RecordMetadata> send(ProducerRecord<K, V> record); /** - * Send a record and invoke the given callback when the record has been acknowledged by the server + * See {@link KafkaProducer#send(ProducerRecord, Callback)} */ - public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); + Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); /** - * Flush any accumulated records from the producer. Blocks until all sends are complete. + * See {@link KafkaProducer#flush()} */ - public void flush(); + void flush(); /** - * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change - * over time so this list should not be cached. + * See {@link KafkaProducer#partitionsFor(String)} */ - public List<PartitionInfo> partitionsFor(String topic); + List<PartitionInfo> partitionsFor(String topic); /** - * Return a map of metrics maintained by the producer + * See {@link KafkaProducer#metrics()} */ - public Map<MetricName, ? extends Metric> metrics(); + Map<MetricName, ? extends Metric> metrics(); /** - * Close this producer + * See {@link KafkaProducer#close()} */ - public void close(); + void close(); /** - * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the - * timeout, fail any pending send requests and force close the producer. + * See {@link KafkaProducer#close(long, TimeUnit)} */ - public void close(long timeout, TimeUnit unit); + void close(long timeout, TimeUnit unit); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 1cd5b24..ef53b89 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -29,7 +29,6 @@ public final class KafkaMetric implements Metric { private MetricConfig config; KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) { - super(); this.metricName = metricName; this.lock = lock; this.measurable = measurable; http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 7ee5424..9cf0e5c 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -22,11 +22,9 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; - import kafka.admin.AdminClient; import kafka.admin.TopicCommand; import kafka.utils.ZkUtils; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b31a3e3..5aec3c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -52,7 +54,6 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.StreamsMetadata; -import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider; import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.StateStoreProvider; @@ -328,6 +329,7 @@ public class KafkaStreams { * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes. * * @param listener a new state listener + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setStateListener(final KafkaStreams.StateListener listener) { if (state == State.CREATED) { @@ -342,6 +344,7 @@ public class KafkaStreams { * terminates due to an uncaught exception. * * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { if (state == State.CREATED) { @@ -362,6 +365,7 @@ public class KafkaStreams { * processing. * * @param globalStateRestoreListener The listener triggered when {@link StateStore} is being restored. + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) { if (state == State.CREATED) { @@ -491,6 +495,7 @@ public class KafkaStreams { * * @param topology the topology specifying the computational logic * @param props properties for {@link StreamsConfig} + * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, final Properties props) { @@ -502,6 +507,7 @@ public class KafkaStreams { * * @param topology the topology specifying the computational logic * @param config the Kafka Streams configuration + * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, final StreamsConfig config) { @@ -515,6 +521,7 @@ public class KafkaStreams { * @param config the Kafka Streams configuration * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients * for the new {@code KafkaStreams} instance + * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, final StreamsConfig config, @@ -524,69 +531,33 @@ public class KafkaStreams { private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final StreamsConfig config, - final KafkaClientSupplier clientSupplier) { - // create the metrics - final Time time = Time.SYSTEM; - - processId = UUID.randomUUID(); - + final KafkaClientSupplier clientSupplier) throws StreamsException { this.config = config; // The application ID is a required config and hence should always have value final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - - internalTopologyBuilder.setApplicationId(applicationId); + processId = UUID.randomUUID(); String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) + if (clientId.length() <= 0) { clientId = applicationId + "-" + processId; + } - this.logPrefix = String.format("stream-client [%s] ", clientId); - + this.logPrefix = String.format("stream-client [%s]", clientId); final LogContext logContext = new LogContext(logPrefix); - this.log = logContext.logger(getClass()); + final String cleanupThreadName = clientId + "-CleanupThread"; - final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(JMX_PREFIX)); - - final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) - .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) - .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); - - metrics = new Metrics(metricConfig, reporters, time); - - threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; - final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length); - GlobalStreamThread.State globalThreadState = null; - - final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>(); - streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); - - final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + internalTopologyBuilder.setApplicationId(applicationId); + // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception + internalTopologyBuilder.build(null); - if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { + long cacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + if (cacheSize < 0) { + cacheSize = 0; log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); } - final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / - (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1))); - - stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); - if (globalTaskTopology != null) { - final String globalThreadId = clientId + "-GlobalStreamThread"; - globalStreamThread = new GlobalStreamThread(globalTaskTopology, - config, - clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), - stateDirectory, - metrics, - time, - globalThreadId); - globalThreadState = globalStreamThread.state(); - } - final StateRestoreListener delegatingStateRestoreListener = new StateRestoreListener() { @Override public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { @@ -610,6 +581,44 @@ public class KafkaStreams { } }; + threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; + try { + stateDirectory = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + Time.SYSTEM); + } catch (final ProcessorStateException fatal) { + throw new StreamsException(fatal); + } + streamsMetadataState = new StreamsMetadataState( + internalTopologyBuilder, + parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); + + final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(JMX_PREFIX)); + metrics = new Metrics(metricConfig, reporters, Time.SYSTEM); + + GlobalStreamThread.State globalThreadState = null; + final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + if (globalTaskTopology != null) { + final String globalThreadId = clientId + "-GlobalStreamThread"; + globalStreamThread = new GlobalStreamThread(globalTaskTopology, + config, + clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), + stateDirectory, + metrics, + Time.SYSTEM, + globalThreadId); + globalThreadState = globalStreamThread.state(); + } + + final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length); + final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>(); for (int i = 0; i < threads.length; i++) { threads[i] = StreamThread.create(internalTopologyBuilder, config, @@ -617,14 +626,15 @@ public class KafkaStreams { processId, clientId, metrics, - time, + Time.SYSTEM, streamsMetadataState, - cacheSizeBytes, + cacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)), stateDirectory, delegatingStateRestoreListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } + final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); if (globalTaskTopology != null) { globalStreamThread.setStateListener(streamStateListener); @@ -635,7 +645,6 @@ public class KafkaStreams { final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores()); queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); - final String cleanupThreadName = clientId + "-CleanupThread"; stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(final Runnable r) { @@ -690,7 +699,8 @@ public class KafkaStreams { * {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}. * @throws IllegalStateException if process was already started - * @throws StreamsException if the Kafka brokers have version 0.10.0.x + * @throws StreamsException if the Kafka brokers have version 0.10.0.x or + * if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers */ public synchronized void start() throws IllegalStateException, StreamsException { log.debug("Starting Streams client"); @@ -858,7 +868,7 @@ public class KafkaStreams { * <p> * Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}. * - * @throws IllegalStateException if the instance is currently running + * @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running} */ public void cleanUp() { if (isRunning()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 446f941..cde416d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -342,10 +342,10 @@ public class StreamsConfig extends AbstractConfig { Importance.MEDIUM, CLIENT_ID_DOC) .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, - Type.CLASS, - LogAndFailExceptionHandler.class.getName(), - Importance.MEDIUM, - DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + Type.CLASS, + LogAndFailExceptionHandler.class.getName(), + Importance.MEDIUM, + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_KEY_SERDE_CLASS_CONFIG, Type.CLASS, Serdes.ByteArraySerde.class.getName(), http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 3c8e077..9e853fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -46,12 +46,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte final StateManager stateManager; public AbstractProcessorContext(final TaskId taskId, - final String applicationId, - final StreamsConfig config, - final StreamsMetrics metrics, - final StateManager stateManager, - final ThreadCache cache) { - + final String applicationId, + final StreamsConfig config, + final StreamsMetrics metrics, + final StateManager stateManager, + final ThreadCache cache) { this.taskId = taskId; this.applicationId = applicationId; this.config = config; @@ -93,7 +92,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte } @Override - public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) { + public void register(final StateStore store, + final boolean loggingEnabled, + final StateRestoreCallback stateRestoreCallback) { if (initialized) { throw new IllegalStateException("Can only create state stores during initialization."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 5ed9aae..6734da6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -226,6 +226,7 @@ public abstract class AbstractTask implements Task { * @throws ProcessorStateException if there is an error while closing the state manager * @param writeCheckpoint boolean indicating if a checkpoint file should be written */ + // visible for testing void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { ProcessorStateException exception = null; log.trace("Closing state manager"); http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 2d886b7..3208f93 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -137,7 +137,7 @@ class AssignedTasks { restoredPartitions.addAll(restored); for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) { final Map.Entry<TaskId, Task> entry = it.next(); - Task task = entry.getValue(); + final Task task = entry.getValue(); if (restoredPartitions.containsAll(task.changelogPartitions())) { transitionToRunning(task); resume.addAll(task.partitions()); @@ -303,11 +303,12 @@ class AssignedTasks { builder.append("\n"); } - private List<Task> allInitializedTasks() { + private List<Task> allTasks() { final List<Task> tasks = new ArrayList<>(); tasks.addAll(running.values()); tasks.addAll(suspended.values()); tasks.addAll(restoring.values()); + tasks.addAll(created.values()); return tasks; } @@ -428,8 +429,7 @@ class AssignedTasks { } void close(final boolean clean) { - close(allInitializedTasks(), clean); - close(created.values(), clean); + close(allTasks(), clean); clear(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index d47af88..81d2f6c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1744,7 +1744,7 @@ public class InternalTopologyBuilder { private String subtopologiesAsString() { final StringBuilder sb = new StringBuilder(); - sb.append("Sub-topologies: \n"); + sb.append("Sub-topologies:\n"); if (subtopologies.isEmpty()) { sb.append(" none\n"); } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index d26511c..889b6d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -42,7 +42,6 @@ public class RecordQueue { private final ArrayDeque<StampedRecord> fifoQueue; private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker; private final SourceNodeRecordDeserializer recordDeserializer; - private final DeserializationExceptionHandler deserializationExceptionHandler; private final ProcessorContext processorContext; private long partitionTime = TimestampTracker.NOT_KNOWN; @@ -58,11 +57,9 @@ public class RecordQueue { this.fifoQueue = new ArrayDeque<>(); this.timeTracker = new MinTimestampTracker<>(); this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler); - this.deserializationExceptionHandler = deserializationExceptionHandler; this.processorContext = processorContext; } - /** * Returns the corresponding source node in the topology * @@ -87,15 +84,15 @@ public class RecordQueue { * @param rawRecords the raw records * @return the size of this queue */ - public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { - for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { + int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { + for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { - ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord); + final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord); if (record == null) { continue; } - long timestamp = timestampExtractor.extract(record, timeTracker.get()); + final long timestamp = timestampExtractor.extract(record, timeTracker.get()); log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); // drop message if TS is invalid, i.e., negative @@ -103,7 +100,7 @@ public class RecordQueue { continue; } - StampedRecord stampedRecord = new StampedRecord(record, timestamp); + final StampedRecord stampedRecord = new StampedRecord(record, timestamp); fifoQueue.addLast(stampedRecord); timeTracker.addElement(stampedRecord); } @@ -111,10 +108,11 @@ public class RecordQueue { // update the partition timestamp if its currently // tracked min timestamp has exceed its value; this will // usually only take effect for the first added batch - long timestamp = timeTracker.get(); + final long timestamp = timeTracker.get(); - if (timestamp > partitionTime) + if (timestamp > partitionTime) { partitionTime = timestamp; + } return size(); } @@ -125,19 +123,21 @@ public class RecordQueue { * @return StampedRecord */ public StampedRecord poll() { - StampedRecord elem = fifoQueue.pollFirst(); + final StampedRecord elem = fifoQueue.pollFirst(); - if (elem == null) + if (elem == null) { return null; + } timeTracker.removeElement(elem); // only advance the partition timestamp if its currently // tracked min timestamp has exceeded its value - long timestamp = timeTracker.get(); + final long timestamp = timeTracker.get(); - if (timestamp > partitionTime) + if (timestamp > partitionTime) { partitionTime = timestamp; + } return elem; } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java index 1d9e722..7fde881 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java @@ -63,12 +63,12 @@ class SourceNodeRecordDeserializer implements RecordDeserializer { } public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext, - ConsumerRecord<byte[], byte[]> rawRecord) { + final ConsumerRecord<byte[], byte[]> rawRecord) { // catch and process if we have a deserialization handler try { return deserialize(rawRecord); - } catch (Exception e) { + } catch (final Exception e) { final DeserializationExceptionHandler.DeserializationHandlerResponse response = deserializationExceptionHandler.handle(processorContext, rawRecord, e); if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 033af24..98ec810 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -63,6 +63,15 @@ public class StandbyTask extends AbstractTask { processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); } + @Override + public boolean initialize() { + initializeStateStores(); + checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); + processorContext.initialized(); + taskInitialized = true; + return true; + } + /** * <pre> * - update offset limits @@ -139,16 +148,6 @@ public class StandbyTask extends AbstractTask { } @Override - public boolean maybePunctuateStreamTime() { - throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask"); - } - - @Override - public boolean maybePunctuateSystemTime() { - throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask"); - } - - @Override public boolean commitNeeded() { return false; } @@ -174,16 +173,18 @@ public class StandbyTask extends AbstractTask { } @Override - public boolean process() { - throw new UnsupportedOperationException("process not supported by StandbyTasks"); + public boolean maybePunctuateStreamTime() { + throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask"); } - public boolean initialize() { - initializeStateStores(); - checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); - processorContext.initialized(); - taskInitialized = true; - return true; + @Override + public boolean maybePunctuateSystemTime() { + throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask"); + } + + @Override + public boolean process() { + throw new UnsupportedOperationException("process not supported by StandbyTasks"); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 8ecc7e2..caa0100 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -185,7 +185,9 @@ public class StoreChangelogReader implements ChangelogReader { needsRestoring.putAll(initialized); } - private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) { + private void logRestoreOffsets(final TopicPartition partition, + final long startingOffset, + final Long endOffset) { log.debug("Restoring partition {} from offset {} to endOffset {}", partition, startingOffset, @@ -229,7 +231,7 @@ public class StoreChangelogReader implements ChangelogReader { } private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords, - final TopicPartition topicPartition) { + final TopicPartition topicPartition) { final StateRestorer restorer = stateRestorers.get(topicPartition); final Long endOffset = endOffsets.get(topicPartition); final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset); @@ -255,7 +257,8 @@ public class StoreChangelogReader implements ChangelogReader { } private long processNext(final List<ConsumerRecord<byte[], byte[]>> records, - final StateRestorer restorer, final Long endOffset) { + final StateRestorer restorer, + final Long endOffset) { final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(); long nextPosition = -1; http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 084a991..0830aa2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -131,7 +131,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // initialize the topology with its own context processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); - final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); + final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); for (final TopicPartition partition : partitions) { final SourceNode source = topology.source(partition.topic()); @@ -151,6 +151,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } } + public boolean initialize() { + log.debug("Initializing"); + initializeStateStores(); + initTopology(); + processorContext.initialized(); + taskInitialized = true; + return topology.stateStores().isEmpty(); + } + + /** * <pre> * - re-initialize the task @@ -597,13 +607,4 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator return new RecordCollectorImpl(producer, id.toString(), logContext); } - public boolean initialize() { - log.debug("Initializing"); - initializeStateStores(); - initTopology(); - processorContext.initialized(); - taskInitialized = true; - return topology.stateStores().isEmpty(); - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 867359b..e141c46 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -421,7 +421,6 @@ public class StreamThread extends Thread implements ThreadDataProvider { } return threadProducer; - } @Override @@ -456,20 +455,29 @@ public class StreamThread extends Thread implements ThreadDataProvider { } @Override - StandbyTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) { + StandbyTask createTask(final Consumer<byte[], byte[]> consumer, + final TaskId taskId, + final Set<TopicPartition> partitions) { taskCreatedSensor.record(); final ProcessorTopology topology = builder.build(taskId.topicGroupId); if (!topology.stateStores().isEmpty()) { - return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory); + return new StandbyTask(taskId, + applicationId, + partitions, + topology, + consumer, + storeChangelogReader, + config, + streamsMetrics, + stateDirectory); } else { - log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, partitions); - + log.trace("Skipped standby task {} with assigned partitions {} " + + "since it does not have any state stores to materialize", taskId, partitions); return null; } } - } /** @@ -781,8 +789,10 @@ public class StreamThread extends Thread implements ThreadDataProvider { final long processLatency = computeLatency(); streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed, timerStartedMs); - processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed, - processLatency, commitTimeMs); + processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, + totalProcessed, + processLatency, + commitTimeMs); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 7afbecf..f12ed91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -110,7 +110,7 @@ class TaskManager { newTasks.put(taskId, partitions); } } catch (final StreamsException e) { - log.error("Failed to create an active task {} due to the following error:", taskId, e); + log.error("Failed to resume an active task {} due to the following error:", taskId, e); throw e; } } else { @@ -122,6 +122,7 @@ class TaskManager { return; } + // CANNOT FIND RETRY AND BACKOFF LOGIC // create all newly assigned tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedTasks(); eventually log.trace("New active tasks to be created: {}", newTasks); @@ -185,24 +186,13 @@ class TaskManager { firstException.compareAndSet(null, active.suspend()); firstException.compareAndSet(null, standby.suspend()); // remove the changelog partitions from restore consumer - firstException.compareAndSet(null, unAssignChangeLogPartitions()); + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); if (firstException.get() != null) { throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get()); } } - private RuntimeException unAssignChangeLogPartitions() { - try { - // un-assign the change log partitions - restoreConsumer.assign(Collections.<TopicPartition>emptyList()); - } catch (final RuntimeException e) { - log.error("Failed to un-assign change log partitions due to the following error:", e); - return e; - } - return null; - } - void shutdown(final boolean clean) { log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(), active.previousTaskIds(), standby.previousTaskIds()); @@ -215,9 +205,9 @@ class TaskManager { log.error("Failed to close KafkaStreamClient due to the following error:", e); } // remove the changelog partitions from restore consumer - unAssignChangeLogPartitions(); + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); taskCreator.close(); - + standbyTaskCreator.close(); } Set<TaskId> suspendedActiveTaskIds() {