Repository: kafka Updated Branches: refs/heads/trunk 942074b77 -> 5ae97196a
KAFKA-3125: Add Kafka Streams Exceptions Author: Guozhang Wang <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #809 from guozhangwang/K3125 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ae97196 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ae97196 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ae97196 Branch: refs/heads/trunk Commit: 5ae97196ae149e58f6cfa3c5b6d968cbd7cb6787 Parents: 942074b Author: Guozhang Wang <[email protected]> Authored: Tue Jan 26 09:19:28 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Jan 26 09:19:28 2016 -0800 ---------------------------------------------------------------------- .../streams/errors/ProcessorStateException.java | 35 ++++++ .../kafka/streams/errors/StreamsException.java | 40 ++++++ .../streams/errors/TaskAssignmentException.java | 37 ++++++ .../streams/errors/TaskIdFormatException.java | 27 ++++ .../errors/TopologyBuilderException.java | 27 ++++ .../kstream/internals/AbstractStream.java | 4 +- .../kstream/internals/KStreamJoinWindow.java | 14 +-- .../kstream/internals/KTableRepartitionMap.java | 3 +- .../processor/DefaultPartitionGrouper.java | 4 +- .../apache/kafka/streams/processor/TaskId.java | 9 +- .../streams/processor/TopologyBuilder.java | 78 ++++++------ .../streams/processor/TopologyException.java | 38 ------ .../processor/internals/AbstractTask.java | 6 +- .../processor/internals/PartitionGroup.java | 3 +- .../internals/ProcessorContextImpl.java | 16 +-- .../internals/ProcessorStateManager.java | 4 +- .../processor/internals/StandbyContextImpl.java | 3 +- .../internals/StreamPartitionAssignor.java | 9 +- .../processor/internals/StreamThread.java | 125 ++++++++++--------- .../internals/assignment/AssignmentInfo.java | 12 +- .../internals/assignment/SubscriptionInfo.java | 1 + .../assignment/TaskAssignmentException.java | 32 ----- .../internals/assignment/TaskAssignor.java | 1 + .../streams/state/internals/RocksDBStore.java | 12 +- .../streams/kstream/KStreamBuilderTest.java | 4 +- .../streams/processor/TopologyBuilderTest.java | 25 ++-- .../apache/kafka/streams/state/StateUtils.java | 2 +- 27 files changed, 333 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java new file mode 100644 index 0000000..6434d04 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +public class ProcessorStateException extends StreamsException { + + private static final long serialVersionUID = 1L; + + public ProcessorStateException(String s) { + super(s); + } + + public ProcessorStateException(String s, Throwable throwable) { + super(s, throwable); + } + + public ProcessorStateException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java new file mode 100644 index 0000000..6247886 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.streams.errors; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * StreamsException is the top-level exception type generated by Kafka Streams. + */ [email protected] +public class StreamsException extends KafkaException { + + public StreamsException(String s) { + super(s); + } + + public StreamsException(String s, Throwable throwable) { + super(s, throwable); + } + + public StreamsException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java new file mode 100644 index 0000000..3ae8503 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskAssignmentException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +/** + * The run time exception class for stream task assignments + */ +public class TaskAssignmentException extends StreamsException { + + private final static long serialVersionUID = 1L; + + public TaskAssignmentException(String s) { + super(s); + } + + public TaskAssignmentException(String s, Throwable throwable) { + super(s, throwable); + } + + public TaskAssignmentException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java new file mode 100644 index 0000000..bf0ebf5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +public class TaskIdFormatException extends StreamsException { + + private static final long serialVersionUID = 1L; + + public TaskIdFormatException(String taskString) { + super("Task id cannot be parsed correctly" + (taskString == null ? "" : " from " + taskString)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java new file mode 100644 index 0000000..9dd740b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +public class TopologyBuilderException extends StreamsException { + + private static final long serialVersionUID = 1L; + + public TopologyBuilderException(String message) { + super("Invalid topology building" + (message == null ? "" : ": " + message)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index fa34ba1..c537465 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -41,7 +41,7 @@ public abstract class AbstractStream<K> { Set<String> otherSourceNodes = other.sourceNodes; if (thisSourceNodes == null || otherSourceNodes == null) - throw new KafkaException("not joinable"); + throw new TopologyBuilderException(this.name + " and " + other.name + " are not joinable"); Set<String> allSourceNodes = new HashSet<>(); allSourceNodes.addAll(thisSourceNodes); http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java index 4f427d4..5b83b28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -27,14 +27,13 @@ import org.apache.kafka.streams.state.WindowStore; class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { private final String windowName; - private final long windowSizeMs; - private final long retentionPeriodMs; - KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) { this.windowName = windowName; - this.windowSizeMs = windowSizeMs; - this.retentionPeriodMs = retentionPeriodMs; + + if (windowSizeMs * 2 > retentionPeriodMs) + throw new TopologyBuilderException("The retention period of the join window " + + windowName + " must at least two times its window size."); } @Override @@ -52,9 +51,6 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> { super.init(context); window = (WindowStore<K, V>) context.getStateStore(windowName); - - if (windowSizeMs * 2 > retentionPeriodMs) - throw new KafkaException("The retention period must be at least two times the join window size."); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 499f721..ff69c37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -60,7 +59,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli @Override public void enableSendingOldValues() { // this should never be called - throw new KafkaException("KTableRepartitionMap should always require sending old values."); + throw new IllegalStateException("KTableRepartitionMap should always require sending old values."); } private KeyValue<K1, V1> computeValue(K key, V value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index 923a217..47c5e58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -18,9 +18,9 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.StreamsException; import java.util.Collections; import java.util.HashMap; @@ -61,7 +61,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper { List<PartitionInfo> infos = metadata.partitionsForTopic(topic); if (infos == null) - throw new KafkaException("topic not found :" + topic); + throw new StreamsException("Topic not found during partition assignment: " + topic); int numPartitions = infos.size(); if (numPartitions > maxNumPartitions) http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index 023bbbb..6e7150e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.errors.TaskIdFormatException; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -38,7 +40,7 @@ public class TaskId implements Comparable<TaskId> { public static TaskId parse(String string) { int index = string.indexOf('_'); - if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(); + if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(string); try { int topicGroupId = Integer.parseInt(string.substring(0, index)); @@ -46,7 +48,7 @@ public class TaskId implements Comparable<TaskId> { return new TaskId(topicGroupId, partition); } catch (Exception e) { - throw new TaskIdFormatException(); + throw new TaskIdFormatException(string); } } @@ -93,7 +95,4 @@ public class TaskId implements Comparable<TaskId> { (this.partition > other.partition ? 1 : 0))); } - - public static class TaskIdFormatException extends RuntimeException { - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index a6b54b7..7af377f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -17,10 +17,10 @@ package org.apache.kafka.streams.processor; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -218,11 +218,11 @@ public class TopologyBuilder { */ public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { if (nodeFactories.containsKey(name)) - throw new TopologyException("Processor " + name + " is already added."); + throw new TopologyBuilderException("Processor " + name + " is already added."); for (String topic : topics) { if (sourceTopicNames.contains(topic)) - throw new TopologyException("Topic " + topic + " has already been registered by another source."); + throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source."); sourceTopicNames.add(topic); } @@ -331,15 +331,15 @@ public class TopologyBuilder { */ public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) { if (nodeFactories.containsKey(name)) - throw new TopologyException("Processor " + name + " is already added."); + throw new TopologyBuilderException("Processor " + name + " is already added."); if (parentNames != null) { for (String parent : parentNames) { if (parent.equals(name)) { - throw new TopologyException("Processor " + name + " cannot be a parent of itself."); + throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself."); } if (!nodeFactories.containsKey(parent)) { - throw new TopologyException("Parent processor " + parent + " is not added yet."); + throw new TopologyBuilderException("Parent processor " + parent + " is not added yet."); } } } @@ -361,15 +361,15 @@ public class TopologyBuilder { */ public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) { if (nodeFactories.containsKey(name)) - throw new TopologyException("Processor " + name + " is already added."); + throw new TopologyBuilderException("Processor " + name + " is already added."); if (parentNames != null) { for (String parent : parentNames) { if (parent.equals(name)) { - throw new TopologyException("Processor " + name + " cannot be a parent of itself."); + throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself."); } if (!nodeFactories.containsKey(parent)) { - throw new TopologyException("Parent processor " + parent + " is not added yet."); + throw new TopologyBuilderException("Parent processor " + parent + " is not added yet."); } } } @@ -388,7 +388,7 @@ public class TopologyBuilder { */ public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) { if (stateFactories.containsKey(supplier.name())) { - throw new TopologyException("StateStore " + supplier.name() + " is already added."); + throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added."); } stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier)); @@ -443,9 +443,9 @@ public class TopologyBuilder { private void connectProcessorAndStateStore(String processorName, String stateStoreName) { if (!stateFactories.containsKey(stateStoreName)) - throw new TopologyException("StateStore " + stateStoreName + " is not added yet."); + throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet."); if (!nodeFactories.containsKey(processorName)) - throw new TopologyException("Processor " + processorName + " is not added yet."); + throw new TopologyBuilderException("Processor " + processorName + " is not added yet."); StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName); Iterator<String> iter = stateStoreFactory.users.iterator(); @@ -459,7 +459,7 @@ public class TopologyBuilder { if (nodeFactory instanceof ProcessorNodeFactory) { ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName); } else { - throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node."); + throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node."); } } @@ -611,38 +611,34 @@ public class TopologyBuilder { Map<String, SourceNode> topicSourceMap = new HashMap<>(); Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>(); - try { - // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) - for (NodeFactory factory : nodeFactories.values()) { - if (nodeGroup == null || nodeGroup.contains(factory.name)) { - ProcessorNode node = factory.build(); - processorNodes.add(node); - processorMap.put(node.name(), node); - - if (factory instanceof ProcessorNodeFactory) { - for (String parent : ((ProcessorNodeFactory) factory).parents) { - processorMap.get(parent).addChild(node); - } - for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { - if (!stateStoreMap.containsKey(stateStoreName)) { - stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier); - } - } - } else if (factory instanceof SourceNodeFactory) { - for (String topic : ((SourceNodeFactory) factory).topics) { - topicSourceMap.put(topic, (SourceNode) node); - } - } else if (factory instanceof SinkNodeFactory) { - for (String parent : ((SinkNodeFactory) factory).parents) { - processorMap.get(parent).addChild(node); + // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) + for (NodeFactory factory : nodeFactories.values()) { + if (nodeGroup == null || nodeGroup.contains(factory.name)) { + ProcessorNode node = factory.build(); + processorNodes.add(node); + processorMap.put(node.name(), node); + + if (factory instanceof ProcessorNodeFactory) { + for (String parent : ((ProcessorNodeFactory) factory).parents) { + processorMap.get(parent).addChild(node); + } + for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { + if (!stateStoreMap.containsKey(stateStoreName)) { + stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier); } - } else { - throw new TopologyException("Unknown definition class: " + factory.getClass().getName()); } + } else if (factory instanceof SourceNodeFactory) { + for (String topic : ((SourceNodeFactory) factory).topics) { + topicSourceMap.put(topic, (SourceNode) node); + } + } else if (factory instanceof SinkNodeFactory) { + for (String parent : ((SinkNodeFactory) factory).parents) { + processorMap.get(parent).addChild(node); + } + } else { + throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName()); } } - } catch (Exception e) { - throw new KafkaException("ProcessorNode construction failed: this should not happen."); } return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values())); http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java deleted file mode 100644 index 99d1405..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.common.KafkaException; - -public class TopologyException extends KafkaException { - - private static final long serialVersionUID = 1L; - - public TopologyException(String message) { - super(message); - } - - public TopologyException(String name, Object value) { - this(name, value, null); - } - - public TopologyException(String name, Object value, String message) { - super("Invalid topology building" + (message == null ? "" : ": " + message)); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 68680ab..46dd738 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -18,9 +18,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; @@ -62,7 +62,7 @@ public abstract class AbstractTask { // if partitions is null, this is a standby task this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby); } catch (IOException e) { - throw new KafkaException("Error while creating the state manager", e); + throw new ProcessorStateException("Error while creating the state manager", e); } } @@ -95,7 +95,7 @@ public abstract class AbstractTask { try { stateMgr.close(recordCollectorOffsets()); } catch (IOException e) { - throw new KafkaException("Error while closing the state manager in processor context", e); + throw new ProcessorStateException("Error while closing the state manager", e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index d888085..b487ff5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -149,7 +148,7 @@ public class PartitionGroup { RecordQueue recordQueue = partitionQueues.get(partition); if (recordQueue == null) - throw new KafkaException("Record's partition does not belong to this partition-group."); + throw new IllegalStateException("Record's partition does not belong to this partition-group."); return recordQueue.size(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 102c534..7931a6f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -17,11 +17,11 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -120,7 +120,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S @Override public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { if (initialized) - throw new KafkaException("Can only create state stores during initialization."); + throw new IllegalStateException("Can only create state stores during initialization."); stateMgr.register(store, loggingEnabled, stateRestoreCallback); } @@ -130,10 +130,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S ProcessorNode node = task.node(); if (node == null) - throw new KafkaException("accessing from an unknown node"); + throw new TopologyBuilderException("Accessing from an unknown node"); if (!node.stateStores.contains(name)) - throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name); + throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name); return stateMgr.getStore(name); } @@ -141,7 +141,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S @Override public String topic() { if (task.record() == null) - throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed"); + throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed"); return task.record().topic(); } @@ -149,7 +149,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S @Override public int partition() { if (task.record() == null) - throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed"); + throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed"); return task.record().partition(); } @@ -157,7 +157,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S @Override public long offset() { if (this.task.record() == null) - throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed"); + throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed"); return this.task.record().offset(); } @@ -165,7 +165,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S @Override public long timestamp() { if (task.record() == null) - throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed"); + throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed"); return task.record().timestamp; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 547bb15..bc7f4b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -19,9 +19,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -160,7 +160,7 @@ public class ProcessorStateManager { } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime); if (partitionNotFound) - throw new KafkaException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition); + throw new StreamsException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition); this.stores.put(store.name(), store); http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 89d185c..133d597 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; @@ -112,7 +111,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup @Override public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { if (initialized) - throw new KafkaException("Can only create state stores during initialization."); + throw new IllegalStateException("Can only create state stores during initialization."); stateMgr.register(store, loggingEnabled, stateRestoreCallback); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 5d87e5a..d499534 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -24,12 +24,13 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.ClientState; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; -import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException; +import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +133,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable return partitions; } catch (IOException e) { - throw new KafkaException(e); + throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e); } } @@ -158,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE); } catch (JsonProcessingException e) { - throw new KafkaException(e); + throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e); } } @@ -193,7 +194,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data); } catch (JsonProcessingException e) { - throw new KafkaException(e); + throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f118f60..eccd02c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -41,6 +41,9 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskIdFormatException; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -127,6 +130,8 @@ public class StreamThread extends Thread { @Override public void onPartitionsRevoked(Collection<TopicPartition> assignment) { commitAll(); + // TODO: right now upon partition revocation, we always remove all the tasks; + // this behavior can be optimized to only remove affected tasks in the future removeStreamTasks(); removeStandbyTasks(); lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned @@ -229,8 +234,13 @@ public class StreamThread extends Thread { try { runLoop(); - } catch (RuntimeException e) { - log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e); + } catch (KafkaException e) { + // just re-throw the exception as it should be logged already + throw e; + } catch (Exception e) { + // we have caught all Kafka related exceptions, and other runtime exceptions + // should be due to user application errors + log.error("Streams application error during processing in thread [" + this.getName() + "]: ", e); throw e; } finally { shutdown(); @@ -251,13 +261,21 @@ public class StreamThread extends Thread { private void shutdown() { log.info("Shutting down stream thread [" + this.getName() + "]"); - // Exceptions should not prevent this call from going through all shutdown steps. + // We need to first remove the tasks before shutting down the underlying clients + // as they may be required in the previous steps; and exceptions should not + // prevent this call from going through all shutdown steps. try { commitAll(); } catch (Throwable e) { // already logged in commitAll() } try { + removeStreamTasks(); + removeStandbyTasks(); + } catch (Throwable e) { + // already logged in removeStreamTasks() and removeStandbyTasks() + } + try { producer.close(); } catch (Throwable e) { log.error("Failed to close producer in thread [" + this.getName() + "]: ", e); @@ -272,70 +290,60 @@ public class StreamThread extends Thread { } catch (Throwable e) { log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e); } - try { - removeStreamTasks(); - removeStandbyTasks(); - } catch (Throwable e) { - // already logged in removeStreamTasks() and removeStandbyTasks() - } log.info("Stream thread shutdown complete [" + this.getName() + "]"); } private void runLoop() { - try { - int totalNumBuffered = 0; - boolean requiresPoll = true; + int totalNumBuffered = 0; + boolean requiresPoll = true; - ensureCopartitioning(builder.copartitionGroups()); + ensureCopartitioning(builder.copartitionGroups()); - consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener); + consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener); - while (stillRunning()) { - // try to fetch some records if necessary - if (requiresPoll) { - requiresPoll = false; + while (stillRunning()) { + // try to fetch some records if necessary + if (requiresPoll) { + requiresPoll = false; - long startPoll = time.milliseconds(); + long startPoll = time.milliseconds(); - ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); + ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); - if (!records.isEmpty()) { - for (TopicPartition partition : records.partitions()) { - StreamTask task = activeTasksByPartition.get(partition); - task.addRecords(partition, records.records(partition)); - } + if (!records.isEmpty()) { + for (TopicPartition partition : records.partitions()) { + StreamTask task = activeTasksByPartition.get(partition); + task.addRecords(partition, records.records(partition)); } - - long endPoll = time.milliseconds(); - sensors.pollTimeSensor.record(endPoll - startPoll); } - totalNumBuffered = 0; + long endPoll = time.milliseconds(); + sensors.pollTimeSensor.record(endPoll - startPoll); + } - if (!activeTasks.isEmpty()) { - // try to process one record from each task - for (StreamTask task : activeTasks.values()) { - long startProcess = time.milliseconds(); + totalNumBuffered = 0; - totalNumBuffered += task.process(); - requiresPoll = requiresPoll || task.requiresPoll(); + if (!activeTasks.isEmpty()) { + // try to process one record from each task + for (StreamTask task : activeTasks.values()) { + long startProcess = time.milliseconds(); - sensors.processTimeSensor.record(time.milliseconds() - startProcess); - } + totalNumBuffered += task.process(); + requiresPoll = requiresPoll || task.requiresPoll(); - maybePunctuate(); - } else { - // even when no task is assigned, we must poll to get a task. - requiresPoll = true; + sensors.processTimeSensor.record(time.milliseconds() - startProcess); } - maybeCommit(); - maybeUpdateStandbyTasks(); - maybeClean(); + maybePunctuate(); + } else { + // even when no task is assigned, we must poll to get a task. + requiresPoll = true; } - } catch (Exception e) { - throw new KafkaException(e); + maybeCommit(); + maybeUpdateStandbyTasks(); + + maybeClean(); } } @@ -396,8 +404,8 @@ public class StreamThread extends Thread { if (task.maybePunctuate(now)) sensors.punctuateTimeSensor.record(time.milliseconds() - now); - } catch (Exception e) { - log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); + } catch (KafkaException e) { + log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } } @@ -418,7 +426,7 @@ public class StreamThread extends Thread { try { if (task.commitNeeded()) commitOne(task, time.milliseconds()); - } catch (Exception e) { + } catch (KafkaException e) { log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } @@ -444,7 +452,7 @@ public class StreamThread extends Thread { private void commitOne(AbstractTask task, long now) { try { task.commit(); - } catch (Exception e) { + } catch (KafkaException e) { log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } @@ -485,7 +493,7 @@ public class StreamThread extends Thread { } } } - } catch (TaskId.TaskIdFormatException e) { + } catch (TaskIdFormatException e) { // there may be some unknown files that sits in the same directory, // we should ignore these files instead trying to delete them as well } @@ -523,7 +531,7 @@ public class StreamThread extends Thread { if (new File(dir, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) tasks.add(id); - } catch (TaskId.TaskIdFormatException e) { + } catch (TaskIdFormatException e) { // there may be some unknown files that sits in the same directory, // we should ignore these files instead trying to delete them as well } @@ -543,7 +551,7 @@ public class StreamThread extends Thread { private void addStreamTasks(Collection<TopicPartition> assignment) { if (partitionAssignor == null) - throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen."); + throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen."); HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>(); @@ -570,7 +578,7 @@ public class StreamThread extends Thread { for (TopicPartition partition : partitions) activeTasksByPartition.put(partition, task); - } catch (Exception e) { + } catch (StreamsException e) { log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e); throw e; } @@ -578,7 +586,6 @@ public class StreamThread extends Thread { } private void removeStreamTasks() { - // TODO: change this clearing tasks behavior for (StreamTask task : activeTasks.values()) { closeOne(task); } @@ -594,7 +601,7 @@ public class StreamThread extends Thread { log.info("Removing a task {}", task.id()); try { task.close(); - } catch (Exception e) { + } catch (StreamsException e) { log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); throw e; } @@ -615,7 +622,7 @@ public class StreamThread extends Thread { private void addStandbyTasks() { if (partitionAssignor == null) - throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen."); + throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen."); Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); @@ -673,14 +680,14 @@ public class StreamThread extends Thread { List<PartitionInfo> infos = consumer.partitionsFor(topic); if (infos == null) - throw new KafkaException("topic not found: " + topic); + throw new TopologyBuilderException("Topic not found: " + topic); if (numPartitions == -1) { numPartitions = infos.size(); } else if (numPartitions != infos.size()) { String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); Arrays.sort(topics); - throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]"); + throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]"); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 2bd4457..c2175bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -17,9 +17,9 @@ package org.apache.kafka.streams.processor.internals.assignment; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.ByteBufferInputStream; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,12 +87,12 @@ public class AssignmentInfo { return ByteBuffer.wrap(baos.toByteArray()); } else { - TaskAssignmentException ex = new TaskAssignmentException("unable to encode assignment data: version=" + version); + TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version); log.error(ex.getMessage(), ex); throw ex; } } catch (IOException ex) { - throw new KafkaException("failed to encode AssignmentInfo", ex); + throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex); } } @@ -128,12 +128,12 @@ public class AssignmentInfo { return new AssignmentInfo(activeTasks, standbyTasks); } else { - TaskAssignmentException ex = new TaskAssignmentException("unknown assignment data version: " + version); + TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version); log.error(ex.getMessage(), ex); throw ex; } } catch (IOException ex) { - throw new KafkaException("failed to decode AssignmentInfo", ex); + throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 43009a1..ccd2f73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; +import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java deleted file mode 100644 index 839a6c2..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals.assignment; - -import org.apache.kafka.common.KafkaException; - -/** - * The run time exception class for stream task assignments - */ -public class TaskAssignmentException extends KafkaException { - - private final static long serialVersionUID = 1L; - - public TaskAssignmentException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java index d1e0782..2501677 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; +import org.apache.kafka.streams.errors.TaskAssignmentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 6c77ab2..dea7e0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -102,13 +102,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { dir.getParentFile().mkdirs(); return RocksDB.open(options, dir.toString()); } else { - throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based."); + throw new ProcessorStateException("Change log is not supported for store " + this.name + " since it is TTL based."); // TODO: support TTL with change log? // return TtlDB.open(options, dir.toString(), ttl, false); } } catch (RocksDBException e) { // TODO: this needs to be handled more accurately - throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e); + throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e); } } @@ -128,7 +128,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return serdes.valueFrom(this.db.get(serdes.rawKey(key))); } catch (RocksDBException e) { // TODO: this needs to be handled more accurately - throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e); + throw new ProcessorStateException("Error while executing get " + key.toString() + " from store " + this.name, e); } } @@ -142,7 +142,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } } catch (RocksDBException e) { // TODO: this needs to be handled more accurately - throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e); + throw new ProcessorStateException("Error while executing put " + key.toString() + " from store " + this.name, e); } } @@ -177,7 +177,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { db.flush(fOptions); } catch (RocksDBException e) { // TODO: this needs to be handled more accurately - throw new KafkaException("Error while executing flush from store " + this.name, e); + throw new ProcessorStateException("Error while executing flush from store " + this.name, e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index f79063f..e75b595 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.internals.KStreamImpl; -import org.apache.kafka.streams.processor.TopologyException; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; @@ -28,7 +28,7 @@ import static org.junit.Assert.assertEquals; public class KStreamBuilderTest { - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testFrom() { final KStreamBuilder builder = new KStreamBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index a2f6ec0..a93b8ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -40,7 +41,7 @@ import static org.junit.Assert.assertTrue; public class TopologyBuilderTest { - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddSourceWithSameName() { final TopologyBuilder builder = new TopologyBuilder(); @@ -48,7 +49,7 @@ public class TopologyBuilderTest { builder.addSource("source", "topic-2"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddSourceWithSameTopic() { final TopologyBuilder builder = new TopologyBuilder(); @@ -56,7 +57,7 @@ public class TopologyBuilderTest { builder.addSource("source-2", "topic-1"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddProcessorWithSameName() { final TopologyBuilder builder = new TopologyBuilder(); @@ -65,21 +66,21 @@ public class TopologyBuilderTest { builder.addProcessor("processor", new MockProcessorSupplier(), "source"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddProcessorWithWrongParent() { final TopologyBuilder builder = new TopologyBuilder(); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddProcessorWithSelfParent() { final TopologyBuilder builder = new TopologyBuilder(); builder.addProcessor("processor", new MockProcessorSupplier(), "processor"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddSinkWithSameName() { final TopologyBuilder builder = new TopologyBuilder(); @@ -88,14 +89,14 @@ public class TopologyBuilderTest { builder.addSink("sink", "topic-3", "source"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddSinkWithWrongParent() { final TopologyBuilder builder = new TopologyBuilder(); builder.addSink("sink", "topic-2", "source"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddSinkWithSelfParent() { final TopologyBuilder builder = new TopologyBuilder(); @@ -145,14 +146,14 @@ public class TopologyBuilderTest { assertEquals(3, builder.sourceTopics().size()); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddStateStoreWithNonExistingProcessor() { final TopologyBuilder builder = new TopologyBuilder(); builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddStateStoreWithSource() { final TopologyBuilder builder = new TopologyBuilder(); @@ -160,7 +161,7 @@ public class TopologyBuilderTest { builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddStateStoreWithSink() { final TopologyBuilder builder = new TopologyBuilder(); @@ -168,7 +169,7 @@ public class TopologyBuilderTest { builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1"); } - @Test(expected = TopologyException.class) + @Test(expected = TopologyBuilderException.class) public void testAddStateStoreWithDuplicates() { final TopologyBuilder builder = new TopologyBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5ae97196/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java index f342dcd..c014ae5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java @@ -50,7 +50,7 @@ public class StateUtils { }); return dir; } catch (IOException ex) { - throw new RuntimeException("failed to create a temp dir", ex); + throw new RuntimeException("Failed to create a temp dir", ex); } }
