This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c7bf4e MINOR: code cleanup (#6053)
1c7bf4e is described below
commit 1c7bf4e4976e2b58826f68f1abe8ffc9fd41692c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 9 18:03:16 2019 +0100
MINOR: code cleanup (#6053)
Reviewers: Bill Bejeck <[email protected]>, John Roesler
<[email protected]>, Ryanne Dolan <[email protected]>, Guozhang Wang
<[email protected]>
---
gradle/dependencies.gradle | 4 +-
.../org/apache/kafka/streams/StreamsBuilder.java | 8 +-
.../kafka/streams/kstream/KGroupedStream.java | 1 -
.../streams/kstream/internals/AbstractStream.java | 53 +++---
.../internals/ForwardingCacheFlushListener.java | 4 +-
.../kstream/internals/KTableKTableInnerJoin.java | 7 +-
.../internals/KeyValueStoreMaterializer.java | 7 +-
.../internals/graph/StreamStreamJoinNode.java | 22 +--
.../internals/graph/TableProcessorNode.java | 6 +-
.../kstream/internals/graph/TableSourceNode.java | 14 +-
.../internals/InternalTopologyBuilder.java | 69 ++++----
.../processor/internals/ProcessorContextImpl.java | 183 ++++++++++++++-------
.../processor/internals/ProcessorStateManager.java | 19 ++-
.../internals/assignment/SubscriptionInfo.java | 6 +-
.../kafka/streams/state/QueryableStoreTypes.java | 40 +++--
.../apache/kafka/streams/state/WindowStore.java | 11 +-
.../state/internals/CachingKeyValueStore.java | 27 +--
.../internals/ChangeLoggingKeyValueBytesStore.java | 21 +--
.../internals/InMemoryKeyValueLoggedStore.java | 133 ---------------
.../state/internals/InMemoryKeyValueStore.java | 4 -
.../state/internals/KeyValueStoreBuilder.java | 1 +
.../streams/state/internals/MemoryLRUCache.java | 27 ++-
.../state/internals/MeteredKeyValueStore.java | 32 ++--
.../state/internals/MeteredSessionStore.java | 64 +++----
.../state/internals/MeteredWindowStore.java | 62 ++++---
.../state/internals/QueryableStoreProvider.java | 8 +-
.../streams/state/internals/RocksDBStore.java | 9 +-
.../streams/state/internals/StoreChangeLogger.java | 12 +-
.../state/internals/WindowStoreBuilder.java | 23 +--
.../state/internals/WrappingStoreProvider.java | 8 +-
.../assignment/StickyTaskAssignorTest.java | 4 +-
31 files changed, 435 insertions(+), 454 deletions(-)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2c2488f..c7faec5 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,7 +57,7 @@ versions += [
jetty: "9.4.12.v20180830",
jersey: "2.27",
jmh: "1.21",
- hamcrest: "1.3",
+ hamcrest: "2.1",
log4j: "1.2.17",
scalaLogging: "3.9.0",
jaxb: "2.3.0",
@@ -119,7 +119,7 @@ libs += [
jmhGeneratorAnnProcess:
"org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
junit: "junit:junit:$versions.junit",
- hamcrest: "org.hamcrest:hamcrest-all:1.3",
+ hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest",
kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 14b63e2..442c87c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -113,7 +113,7 @@ public class StreamsBuilder {
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String>
topics) {
- return stream(topics, Consumed.<K, V>with(null, null, null, null));
+ return stream(topics, Consumed.with(null, null, null, null));
}
/**
@@ -155,7 +155,7 @@ public class StreamsBuilder {
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Pattern
topicPattern) {
- return stream(topicPattern, Consumed.<K, V>with(null, null));
+ return stream(topicPattern, Consumed.with(null, null));
}
/**
@@ -250,7 +250,7 @@ public class StreamsBuilder {
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic) {
- return table(topic, new ConsumedInternal<K, V>());
+ return table(topic, new ConsumedInternal<>());
}
/**
@@ -356,7 +356,7 @@ public class StreamsBuilder {
* @return a {@link GlobalKTable} for the specified topic
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String
topic) {
- return globalTable(topic, Consumed.<K, V>with(null, null));
+ return globalTable(topic, Consumed.with(null, null));
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 7b69e03..05e4ac9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -115,7 +115,6 @@ public interface KGroupedStream<K, V> {
*/
KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes,
byte[]>> materialized);
-
/**
* Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index e870751..f087b79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -95,48 +95,35 @@ public abstract class AbstractStream<K, V> {
}
static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final
ValueJoiner<T1, T2, R> joiner) {
- return new ValueJoiner<T2, T1, R>() {
- @Override
- public R apply(final T2 value2, final T1 value1) {
- return joiner.apply(value1, value2);
- }
- };
+ return (value2, value1) -> joiner.apply(value1, value2);
}
static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final
ValueMapper<V, VR> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
- return new ValueMapperWithKey<K, V, VR>() {
- @Override
- public VR apply(final K readOnlyKey, final V value) {
- return valueMapper.apply(value);
- }
- };
+ return (readOnlyKey, value) -> valueMapper.apply(value);
}
static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR>
toValueTransformerWithKeySupplier(
final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier,
"valueTransformerSupplier can't be null");
- return new ValueTransformerWithKeySupplier<K, V, VR>() {
- @Override
- public ValueTransformerWithKey<K, V, VR> get() {
- final ValueTransformer<V, VR> valueTransformer =
valueTransformerSupplier.get();
- return new ValueTransformerWithKey<K, V, VR>() {
- @Override
- public void init(final ProcessorContext context) {
- valueTransformer.init(context);
- }
-
- @Override
- public VR transform(final K readOnlyKey, final V value) {
- return valueTransformer.transform(value);
- }
-
- @Override
- public void close() {
- valueTransformer.close();
- }
- };
- }
+ return () -> {
+ final ValueTransformer<V, VR> valueTransformer =
valueTransformerSupplier.get();
+ return new ValueTransformerWithKey<K, V, VR>() {
+ @Override
+ public void init(final ProcessorContext context) {
+ valueTransformer.init(context);
+ }
+
+ @Override
+ public VR transform(final K readOnlyKey, final V value) {
+ return valueTransformer.transform(value);
+ }
+
+ @Override
+ public void close() {
+ valueTransformer.close();
+ }
+ };
};
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
index 4065ced..eee61fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -30,7 +30,9 @@ class ForwardingCacheFlushListener<K, V> implements
CacheFlushListener<K, V> {
}
@Override
- public void apply(final K key, final V newValue, final V oldValue) {
+ public void apply(final K key,
+ final V newValue,
+ final V oldValue) {
final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index a7bbb56..58110b4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -28,12 +28,7 @@ import org.slf4j.LoggerFactory;
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1, V2> {
private static final Logger LOG =
LoggerFactory.getLogger(KTableKTableInnerJoin.class);
- private final KeyValueMapper<K, V1, K> keyValueMapper = new
KeyValueMapper<K, V1, K>() {
- @Override
- public K apply(final K key, final V1 value) {
- return key;
- }
- };
+ private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) ->
key;
KTableKTableInnerJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index c8cd35d..67872be 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -38,9 +38,10 @@ public class KeyValueStoreMaterializer<K, V> {
final String name = materialized.storeName();
supplier = Stores.persistentKeyValueStore(name);
}
- final StoreBuilder<KeyValueStore<K, V>> builder =
Stores.keyValueStoreBuilder(supplier,
-
materialized.keySerde(),
-
materialized.valueSerde());
+ final StoreBuilder<KeyValueStore<K, V>> builder =
Stores.keyValueStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 46cf3e7..d080c18 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -35,16 +35,16 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
private final Joined<K, V1, V2> joined;
- StreamStreamJoinNode(final String nodeName,
- final ValueJoiner<? super V1, ? super V2, ? extends
VR> valueJoiner,
- final ProcessorParameters<K, V1>
joinThisProcessorParameters,
- final ProcessorParameters<K, V2>
joinOtherProcessParameters,
- final ProcessorParameters<K, VR>
joinMergeProcessorParameters,
- final ProcessorParameters<K, V1>
thisWindowedStreamProcessorParameters,
- final ProcessorParameters<K, V2>
otherWindowedStreamProcessorParameters,
- final StoreBuilder<WindowStore<K, V1>>
thisWindowStoreBuilder,
- final StoreBuilder<WindowStore<K, V2>>
otherWindowStoreBuilder,
- final Joined<K, V1, V2> joined) {
+ private StreamStreamJoinNode(final String nodeName,
+ final ValueJoiner<? super V1, ? super V2, ?
extends VR> valueJoiner,
+ final ProcessorParameters<K, V1>
joinThisProcessorParameters,
+ final ProcessorParameters<K, V2>
joinOtherProcessParameters,
+ final ProcessorParameters<K, VR>
joinMergeProcessorParameters,
+ final ProcessorParameters<K, V1>
thisWindowedStreamProcessorParameters,
+ final ProcessorParameters<K, V2>
otherWindowedStreamProcessorParameters,
+ final StoreBuilder<WindowStore<K, V1>>
thisWindowStoreBuilder,
+ final StoreBuilder<WindowStore<K, V2>>
otherWindowStoreBuilder,
+ final Joined<K, V1, V2> joined) {
super(nodeName,
valueJoiner,
@@ -89,7 +89,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
topologyBuilder.addStateStore(otherWindowStoreBuilder,
otherWindowedStreamProcessorName, thisProcessorName);
}
- public static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2,
VR> streamStreamJoinNodeBuilder() {
+ public static <K, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR>
streamStreamJoinNodeBuilder() {
return new StreamStreamJoinNodeBuilder<>();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index eb3328a..0409c62 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -66,7 +66,11 @@ public class TableProcessorNode<K, V, S extends StateStore>
extends StreamsGraph
final boolean shouldMaterialize = materializedInternal != null &&
materializedInternal.queryableStoreName() != null;
if (shouldMaterialize) {
// TODO: we are enforcing this as a keyvalue store, but it should
go beyond any type of stores
- topologyBuilder.addStateStore(new
KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes,
byte[]>>) materializedInternal).materialize(), processorName);
+ topologyBuilder.addStateStore(
+ new KeyValueStoreMaterializer<>(
+ (MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>)
materializedInternal
+ ).materialize(),
+ processorName);
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 6c213a7..53061dc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -39,13 +39,13 @@ public class TableSourceNode<K, V> extends
StreamSourceNode<K, V> {
private final String sourceName;
private final boolean isGlobalKTable;
- TableSourceNode(final String nodeName,
- final String sourceName,
- final String topic,
- final ConsumedInternal<K, V> consumedInternal,
- final MaterializedInternal<K, V, ?> materializedInternal,
- final ProcessorParameters<K, V> processorParameters,
- final boolean isGlobalKTable) {
+ private TableSourceNode(final String nodeName,
+ final String sourceName,
+ final String topic,
+ final ConsumedInternal<K, V> consumedInternal,
+ final MaterializedInternal<K, V, ?>
materializedInternal,
+ final ProcessorParameters<K, V>
processorParameters,
+ final boolean isGlobalKTable) {
super(nodeName,
Collections.singletonList(topic),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index b1f9496..0648fec 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -50,13 +50,10 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
-
public class InternalTopologyBuilder {
private static final Logger log =
LoggerFactory.getLogger(InternalTopologyBuilder.class);
-
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
-
private static final String[] NO_PREDECESSORS = {};
// node factories in a topological order
@@ -706,13 +703,15 @@ public class InternalTopologyBuilder {
// in the map; this scenario is possible, for example, that a state
store underlying a source KTable is
// connecting to a join operator whose source topic is not the
original KTable's source topic but an internal repartition topic.
- if (stateStoreNameToSourceTopics.containsKey(stateStoreName) ||
stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
+ if (stateStoreNameToSourceTopics.containsKey(stateStoreName)
+ || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
return;
}
final Set<String> sourceTopics = new HashSet<>();
final Set<Pattern> sourcePatterns = new HashSet<>();
- final Set<SourceNodeFactory> sourceNodesForPredecessor =
findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
+ final Set<SourceNodeFactory> sourceNodesForPredecessor =
+
findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
for (final SourceNodeFactory sourceNodeFactory :
sourceNodesForPredecessor) {
if (sourceNodeFactory.pattern != null) {
@@ -1019,7 +1018,9 @@ public class InternalTopologyBuilder {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the
application id
final String internalTopic = decorateTopic(topic);
- repartitionTopics.put(internalTopic, new
RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
+ repartitionTopics.put(
+ internalTopic,
+ new RepartitionTopicConfig(internalTopic,
Collections.emptyMap()));
sourceTopics.add(internalTopic);
} else {
sourceTopics.add(topic);
@@ -1038,14 +1039,16 @@ public class InternalTopologyBuilder {
}
}
- // if the node is connected to a state store whose changelog
topics are not predefined, add to the changelog topics
+ // if the node is connected to a state store whose changelog
topics are not predefined,
+ // add to the changelog topics
for (final StateStoreFactory stateFactory :
stateFactories.values()) {
if (stateFactory.loggingEnabled() &&
stateFactory.users().contains(node)) {
final String topicName =
storeToChangelogTopic.containsKey(stateFactory.name()) ?
storeToChangelogTopic.get(stateFactory.name())
:
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
if (!stateChangelogTopics.containsKey(topicName)) {
- final InternalTopicConfig internalTopicConfig =
createChangelogTopicConfig(stateFactory, topicName);
+ final InternalTopicConfig internalTopicConfig =
+ createChangelogTopicConfig(stateFactory,
topicName);
stateChangelogTopics.put(topicName,
internalTopicConfig);
}
}
@@ -1066,7 +1069,8 @@ public class InternalTopologyBuilder {
// Adjust the generated topology based on the configs.
// Not exposed as public API and should be removed post 2.0
private void adjust(final StreamsConfig config) {
- final boolean enableOptimization20 =
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
+ final boolean enableOptimization20 =
+
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
if (enableOptimization20) {
for (final Map.Entry<StoreBuilder, String> entry :
storeToSourceChangelogTopic.entrySet()) {
@@ -1084,9 +1088,12 @@ public class InternalTopologyBuilder {
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry :
nodeToSourcePatterns.entrySet()) {
- final SourceNodeFactory sourceNode = (SourceNodeFactory)
nodeFactories.get(stringPatternEntry.getKey());
+ final SourceNodeFactory sourceNode =
+ (SourceNodeFactory)
nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from
given regex
- nodeToSourceTopics.put(stringPatternEntry.getKey(),
sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+ nodeToSourceTopics.put(
+ stringPatternEntry.getKey(),
+ sourceNode.getTopics(subscriptionUpdates.getUpdates()));
log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
}
}
@@ -1108,7 +1115,9 @@ public class InternalTopologyBuilder {
if (storeTopics != null) {
updatedTopicsForStateStore.addAll(storeTopics);
}
- stateStoreNameToSourceTopics.put(storePattern.getKey(),
Collections.unmodifiableSet(updatedTopicsForStateStore));
+ stateStoreNameToSourceTopics.put(
+ storePattern.getKey(),
+
Collections.unmodifiableSet(updatedTopicsForStateStore));
}
}
}
@@ -1205,11 +1214,11 @@ public class InternalTopologyBuilder {
return applicationId + "-" + topic;
}
- public SubscriptionUpdates subscriptionUpdates() {
+ SubscriptionUpdates subscriptionUpdates() {
return subscriptionUpdates;
}
- public synchronized Pattern sourceTopicPattern() {
+ synchronized Pattern sourceTopicPattern() {
if (topicPattern == null) {
final List<String> allSourceTopics = new ArrayList<>();
if (!nodeToSourceTopics.isEmpty()) {
@@ -1263,7 +1272,9 @@ public class InternalTopologyBuilder {
return description;
}
- private void describeGlobalStore(final TopologyDescription description,
final Set<String> nodes, final int id) {
+ private void describeGlobalStore(final TopologyDescription description,
+ final Set<String> nodes,
+ final int id) {
final Iterator<String> it = nodes.iterator();
while (it.hasNext()) {
final String node = it.next();
@@ -1314,7 +1325,8 @@ public class InternalTopologyBuilder {
private final static NodeComparator NODE_COMPARATOR = new NodeComparator();
- private static void updateSize(final AbstractNode node, final int delta) {
+ private static void updateSize(final AbstractNode node,
+ final int delta) {
node.size += delta;
for (final TopologyDescription.Node predecessor : node.predecessors())
{
@@ -1523,7 +1535,8 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
- return "Processor: " + name + " (stores: " + stores + ")\n
--> " + nodeNames(successors) + "\n <-- " + nodeNames(predecessors);
+ return "Processor: " + name + " (stores: " + stores + ")\n
--> "
+ + nodeNames(successors) + "\n <-- " +
nodeNames(predecessors);
}
@Override
@@ -1592,7 +1605,8 @@ public class InternalTopologyBuilder {
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return "Sink: " + name + " (topic: " + topic() + ")\n <--
" + nodeNames(predecessors);
}
- return "Sink: " + name + " (extractor class: " +
topicNameExtractor + ")\n <-- " + nodeNames(predecessors);
+ return "Sink: " + name + " (extractor class: " +
topicNameExtractor + ")\n <-- "
+ + nodeNames(predecessors);
}
@Override
@@ -1678,8 +1692,8 @@ public class InternalTopologyBuilder {
}
public static class TopicsInfo {
- public final Set<String> sinkTopics;
- public final Set<String> sourceTopics;
+ final Set<String> sinkTopics;
+ final Set<String> sourceTopics;
public final Map<String, InternalTopicConfig> stateChangelogTopics;
public final Map<String, InternalTopicConfig> repartitionSourceTopics;
@@ -1775,10 +1789,8 @@ public class InternalTopologyBuilder {
int globalStoresIndex = sortedGlobalStores.length - 1;
while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
sb.append(" ");
- final TopologyDescription.Subtopology subtopology =
- sortedSubtopologies[subtopologiesIndex];
- final TopologyDescription.GlobalStore globalStore =
- sortedGlobalStores[globalStoresIndex];
+ final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
+ final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
if (subtopology.id() == expectedId) {
sb.append(subtopology);
subtopologiesIndex--;
@@ -1789,15 +1801,13 @@ public class InternalTopologyBuilder {
expectedId++;
}
while (subtopologiesIndex != -1) {
- final TopologyDescription.Subtopology subtopology =
- sortedSubtopologies[subtopologiesIndex];
+ final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
sb.append(" ");
sb.append(subtopology);
subtopologiesIndex--;
}
while (globalStoresIndex != -1) {
- final TopologyDescription.GlobalStore globalStore =
- sortedGlobalStores[globalStoresIndex];
+ final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
sb.append(" ");
sb.append(globalStore);
globalStoresIndex--;
@@ -1868,7 +1878,8 @@ public class InternalTopologyBuilder {
}
}
- public void updateSubscribedTopics(final Set<String> topics, final String
logPrefix) {
+ void updateSubscribedTopics(final Set<String> topics,
+ final String logPrefix) {
final SubscriptionUpdates subscriptionUpdates = new
SubscriptionUpdates();
log.debug("{}found {} topics possibly matching regex", logPrefix,
topics);
// update the topic groups with the returned subscription set for
regex pattern subscriptions
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f3e5160..8076725 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -35,10 +35,10 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import
org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
import java.time.Duration;
import java.util.List;
-import
org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
@@ -96,12 +96,12 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
if (!currentNode().stateStores.contains(name)) {
throw new StreamsException("Processor " + currentNode().name() + "
has no access to StateStore " + name +
- " as the store is not connected to the processor. If you
add stores manually via '.addStateStore()' " +
- "make sure to connect the added store to the processor by
providing the processor name to " +
- "'.addStateStore()' or connect them via
'.connectProcessorAndStateStores()'. " +
- "DSL users need to provide the store name to '.process()',
'.transform()', or '.transformValues()' " +
- "to connect the store to the corresponding operator. If
you do not add stores manually, " +
- "please file a bug report at
https://issues.apache.org/jira/projects/KAFKA.");
+ " as the store is not connected to the processor. If you add
stores manually via '.addStateStore()' " +
+ "make sure to connect the added store to the processor by
providing the processor name to " +
+ "'.addStateStore()' or connect them via
'.connectProcessorAndStateStores()'. " +
+ "DSL users need to provide the store name to '.process()',
'.transform()', or '.transformValues()' " +
+ "to connect the store to the corresponding operator. If you do
not add stores manually, " +
+ "please file a bug report at
https://issues.apache.org/jira/projects/KAFKA.");
}
final StateStore store = stateManager.getStore(name);
@@ -118,25 +118,35 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final K key, final V value) {
+ public <K, V> void forward(final K key,
+ final V value) {
forward(key, value, SEND_TO_ALL);
}
@SuppressWarnings({"unchecked", "deprecation"})
@Override
- public <K, V> void forward(final K key, final V value, final int
childIndex) {
- forward(key, value, To.child(((List<ProcessorNode>)
currentNode().children()).get(childIndex).name()));
+ public <K, V> void forward(final K key,
+ final V value,
+ final int childIndex) {
+ forward(
+ key,
+ value,
+ To.child(((List<ProcessorNode>)
currentNode().children()).get(childIndex).name()));
}
@SuppressWarnings({"unchecked", "deprecation"})
@Override
- public <K, V> void forward(final K key, final V value, final String
childName) {
+ public <K, V> void forward(final K key,
+ final V value,
+ final String childName) {
forward(key, value, To.child(childName));
}
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final K key, final V value, final To to) {
+ public <K, V> void forward(final K key,
+ final V value,
+ final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
@@ -148,8 +158,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
if (sendTo != null) {
final ProcessorNode child = currentNode().getChild(sendTo);
if (child == null) {
- throw new StreamsException("Unknown downstream node: " +
sendTo + " either does not exist or is not" +
- " connected to this processor.");
+ throw new StreamsException("Unknown downstream node: " +
sendTo
+ + " either does not exist or is not connected to this
processor.");
}
forward(child, key, value);
} else {
@@ -182,7 +192,9 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
@Override
@Deprecated
- public Cancellable schedule(final long interval, final PunctuationType
type, final Punctuator callback) {
+ public Cancellable schedule(final long interval,
+ final PunctuationType type,
+ final Punctuator callback) {
if (interval < 1) {
throw new IllegalArgumentException("The minimum supported
scheduling interval is 1 millisecond.");
}
@@ -210,7 +222,7 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
private abstract static class StateStoreReadOnlyDecorator<T extends
StateStore> extends AbstractStateStore {
static final String ERROR_MESSAGE = "Global store is read only";
- StateStoreReadOnlyDecorator(final T inner) {
+ private StateStoreReadOnlyDecorator(final T inner) {
super(inner);
}
@@ -225,7 +237,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public void init(final ProcessorContext context, final StateStore
root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -235,8 +248,11 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
}
- private static class KeyValueStoreReadOnlyDecorator<K, V> extends
StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V>
{
- KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
+ private static class KeyValueStoreReadOnlyDecorator<K, V>
+ extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>>
+ implements KeyValueStore<K, V> {
+
+ private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V>
inner) {
super(inner);
}
@@ -246,7 +262,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public KeyValueIterator<K, V> range(final K from, final K to) {
+ public KeyValueIterator<K, V> range(final K from,
+ final K to) {
return getInner().range(from, to);
}
@@ -261,12 +278,14 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public void put(final K key, final V value) {
+ public void put(final K key,
+ final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
- public V putIfAbsent(final K key, final V value) {
+ public V putIfAbsent(final K key,
+ final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -281,35 +300,47 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
}
- private static class WindowStoreReadOnlyDecorator<K, V> extends
StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
- WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
+ private static class WindowStoreReadOnlyDecorator<K, V>
+ extends StateStoreReadOnlyDecorator<WindowStore<K, V>>
+ implements WindowStore<K, V> {
+
+ private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
super(inner);
}
@Override
- public void put(final K key, final V value) {
+ public void put(final K key,
+ final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
- public void put(final K key, final V value, final long
windowStartTimestamp) {
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
- public V fetch(final K key, final long time) {
+ public V fetch(final K key,
+ final long time) {
return getInner().fetch(key, time);
}
@Deprecated
@Override
- public WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
return getInner().fetch(key, timeFrom, timeTo);
}
@Deprecated
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K
to, final long timeFrom, final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
return getInner().fetch(from, to, timeFrom, timeTo);
}
@@ -320,23 +351,32 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
@Deprecated
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
return getInner().fetchAll(timeFrom, timeTo);
}
}
- private static class SessionStoreReadOnlyDecorator<K, AGG> extends
StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K,
AGG> {
- SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
+ private static class SessionStoreReadOnlyDecorator<K, AGG>
+ extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>>
+ implements SessionStore<K, AGG> {
+
+ private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG>
inner) {
super(inner);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime, final long latestSessionStartTime) {
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
return getInner().findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K
keyFrom, final K keyTo, final long earliestSessionEndTime, final long
latestSessionStartTime) {
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
return getInner().findSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
}
@@ -346,7 +386,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+ public void put(final Windowed<K> sessionKey,
+ final AGG aggregate) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -356,7 +397,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K
to) {
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+ final K to) {
return getInner().fetch(from, to);
}
}
@@ -364,7 +406,7 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
private abstract static class StateStoreReadWriteDecorator<T extends
StateStore> extends AbstractStateStore {
static final String ERROR_MESSAGE = "This method may only be called by
Kafka Streams";
- StateStoreReadWriteDecorator(final T inner) {
+ private StateStoreReadWriteDecorator(final T inner) {
super(inner);
}
@@ -374,7 +416,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public void init(final ProcessorContext context, final StateStore
root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -384,8 +427,11 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
}
- private static class KeyValueStoreReadWriteDecorator<K, V> extends
StateStoreReadWriteDecorator<KeyValueStore<K, V>> implements KeyValueStore<K,
V> {
- KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+ private static class KeyValueStoreReadWriteDecorator<K, V>
+ extends StateStoreReadWriteDecorator<KeyValueStore<K, V>>
+ implements KeyValueStore<K, V> {
+
+ private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V>
inner) {
super(inner);
}
@@ -395,7 +441,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public KeyValueIterator<K, V> range(final K from, final K to) {
+ public KeyValueIterator<K, V> range(final K from,
+ final K to) {
return wrapped().range(from, to);
}
@@ -410,12 +457,14 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public void put(final K key, final V value) {
+ public void put(final K key,
+ final V value) {
wrapped().put(key, value);
}
@Override
- public V putIfAbsent(final K key, final V value) {
+ public V putIfAbsent(final K key,
+ final V value) {
return wrapped().putIfAbsent(key, value);
}
@@ -430,35 +479,47 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
}
- private static class WindowStoreReadWriteDecorator<K, V> extends
StateStoreReadWriteDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
- WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+ private static class WindowStoreReadWriteDecorator<K, V>
+ extends StateStoreReadWriteDecorator<WindowStore<K, V>>
+ implements WindowStore<K, V> {
+
+ private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
super(inner);
}
@Override
- public void put(final K key, final V value) {
+ public void put(final K key,
+ final V value) {
wrapped().put(key, value);
}
@Override
- public void put(final K key, final V value, final long
windowStartTimestamp) {
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
wrapped().put(key, value, windowStartTimestamp);
}
@Override
- public V fetch(final K key, final long time) {
+ public V fetch(final K key,
+ final long time) {
return wrapped().fetch(key, time);
}
@Deprecated
@Override
- public WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
return wrapped().fetch(key, timeFrom, timeTo);
}
@Deprecated
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K
to, final long timeFrom, final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
return wrapped().fetch(from, to, timeFrom, timeTo);
}
@@ -469,23 +530,32 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
@Deprecated
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
}
}
- private static class SessionStoreReadWriteDecorator<K, AGG> extends
StateStoreReadWriteDecorator<SessionStore<K, AGG>> implements SessionStore<K,
AGG> {
- SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+ private static class SessionStoreReadWriteDecorator<K, AGG>
+ extends StateStoreReadWriteDecorator<SessionStore<K, AGG>>
+ implements SessionStore<K, AGG> {
+
+ private SessionStoreReadWriteDecorator(final SessionStore<K, AGG>
inner) {
super(inner);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime, final long latestSessionStartTime) {
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
return wrapped().findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K
keyFrom, final K keyTo, final long earliestSessionEndTime, final long
latestSessionStartTime) {
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
return wrapped().findSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
}
@@ -505,7 +575,8 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
}
@Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K
to) {
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+ final K to) {
return wrapped().fetch(from, to);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index d08779f..cf830fb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -96,7 +96,8 @@ public class ProcessorStateManager extends
AbstractStateManager {
}
- public static String storeChangelogTopic(final String applicationId, final
String storeName) {
+ public static String storeChangelogTopic(final String applicationId,
+ final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}
@@ -133,12 +134,13 @@ public class ProcessorStateManager extends
AbstractStateManager {
restoreCallbacks.put(topic, stateRestoreCallback);
} else {
log.trace("Restoring state store {} from changelog topic {}",
storeName, topic);
- final StateRestorer restorer = new StateRestorer(storePartition,
- new
CompositeRestoreListener(stateRestoreCallback),
-
checkpointableOffsets.get(storePartition),
-
offsetLimit(storePartition),
-
store.persistent(),
- storeName);
+ final StateRestorer restorer = new StateRestorer(
+ storePartition,
+ new CompositeRestoreListener(stateRestoreCallback),
+ checkpointableOffsets.get(storePartition),
+ offsetLimit(storePartition),
+ store.persistent(),
+ storeName);
changelogReader.register(restorer);
}
@@ -190,7 +192,8 @@ public class ProcessorStateManager extends
AbstractStateManager {
standbyRestoredOffsets.put(storePartition, lastOffset + 1);
}
- void putOffsetLimit(final TopicPartition partition, final long limit) {
+ void putOffsetLimit(final TopicPartition partition,
+ final long limit) {
log.trace("Updating store offset limit for partition {} to {}",
partition, limit);
offsetLimits.put(partition, limit);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index b4ad19f..03b9e2d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -171,7 +171,7 @@ public class SubscriptionInfo {
if (userEndPoint == null) {
return new byte[0];
} else {
- return userEndPoint.getBytes(Charset.forName("UTF-8"));
+ return userEndPoint.getBytes(StandardCharsets.UTF_8);
}
}
@@ -318,7 +318,7 @@ public class SubscriptionInfo {
if (bytesLength != 0) {
final byte[] bytes = new byte[bytesLength];
data.get(bytes);
- subscriptionInfo.userEndPoint = new String(bytes,
Charset.forName("UTF-8"));
+ subscriptionInfo.userEndPoint = new String(bytes,
StandardCharsets.UTF_8);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index d33e324..297e181 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
@@ -23,37 +25,40 @@ import
org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
/**
- * Provides access to the {@link QueryableStoreType}s provided with
KafkaStreams. These
- * can be used with {@link org.apache.kafka.streams.KafkaStreams#store(String,
QueryableStoreType)}
- * To access and query the {@link StateStore}s that are part of a Topology
+ * Provides access to the {@link QueryableStoreType}s provided with {@link
KafkaStreams}.
+ * These can be used with {@link KafkaStreams#store(String,
QueryableStoreType)}.
+ * To access and query the {@link StateStore}s that are part of a {@link
Topology}.
*/
-public class QueryableStoreTypes {
+public final class QueryableStoreTypes {
/**
- * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
- * @param <K> key type of the store
- * @param <V> value type of the store
- * @return {@link QueryableStoreTypes.KeyValueStoreType}
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.KeyValueStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>>
keyValueStore() {
return new KeyValueStoreType<>();
}
/**
- * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
- * @param <K> key type of the store
- * @param <V> value type of the store
- * @return {@link QueryableStoreTypes.WindowStoreType}
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.WindowStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>>
windowStore() {
return new WindowStoreType<>();
}
/**
- * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
- * @param <K> key type of the store
- * @param <V> value type of the store
- * @return {@link QueryableStoreTypes.SessionStoreType}
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.SessionStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>>
sessionStore() {
return new SessionStoreType<>();
@@ -104,7 +109,8 @@ public class QueryableStoreTypes {
super(ReadOnlySessionStore.class);
}
@Override
- public ReadOnlySessionStore<K, V> create(final StateStoreProvider
storeProvider, final String storeName) {
+ public ReadOnlySessionStore<K, V> create(final StateStoreProvider
storeProvider,
+ final String storeName) {
return new CompositeReadOnlySessionStore<>(storeProvider, this,
storeName);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index fcbd004..2203f59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -43,16 +43,16 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as
deletes
- * @throws NullPointerException If null is used for key.
+ * @throws NullPointerException if the given key is {@code null}
*/
void put(K key, V value);
/**
- * Put a key-value pair with the given timestamp into the corresponding
window
+ * Put a key-value pair into the window with given window start timestamp
* @param key The key to associate the value to
* @param value The value; can be null
* @param windowStartTimestamp The timestamp of the beginning of the
window to put the key/value into
- * @throws NullPointerException If null is used for key.
+ * @throws NullPointerException if the given key is {@code null}
*/
void put(K key, V value, long windowStartTimestamp);
@@ -87,7 +87,7 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
* @param timeTo time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If {@code null} is used for key.
+ * @throws NullPointerException if the given key is {@code null}
*/
@SuppressWarnings("deprecation")
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@@ -111,7 +111,7 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>,
value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If {@code null} is used for any key.
+ * @throws NullPointerException if one of the given keys is {@code null}
*/
@SuppressWarnings("deprecation")
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long
timeTo);
@@ -132,7 +132,6 @@ public interface WindowStore<K, V> extends StateStore,
ReadOnlyWindowStore<K, V>
* @param timeTo the end of the time slot from which to search
(inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>,
value>}
* @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if {@code null} is used for any key
*/
@SuppressWarnings("deprecation")
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 8d9b207..a736ab6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -59,7 +59,8 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
initInternal(context);
underlying.init(context, root);
// save the stream thread as we only ever want to trigger a flush
@@ -76,17 +77,15 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
this.cache = this.context.getCache();
this.cacheName =
ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(),
underlying.name());
- cache.addDirtyEntryFlushListener(cacheName, new
ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> entries) {
- for (final ThreadCache.DirtyEntry entry : entries) {
- putAndMaybeForward(entry, (InternalProcessorContext)
context);
- }
+ cache.addDirtyEntryFlushListener(cacheName, entries -> {
+ for (final ThreadCache.DirtyEntry entry : entries) {
+ putAndMaybeForward(entry, (InternalProcessorContext) context);
}
});
}
- private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final
InternalProcessorContext context) {
+ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
+ final InternalProcessorContext context) {
final ProcessorRecordContext current = context.recordContext();
try {
context.setRecordContext(entry.entry().context());
@@ -190,7 +189,8 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
@Override
- public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes
to) {
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator =
underlying.range(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
cache.range(cacheName, from, to);
@@ -217,7 +217,8 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
@Override
- public void put(final Bytes key, final byte[] value) {
+ public void put(final Bytes key,
+ final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
lock.writeLock().lock();
@@ -229,7 +230,8 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
}
- private void putInternal(final Bytes key, final byte[] value) {
+ private void putInternal(final Bytes key,
+ final byte[] value) {
cache.put(
cacheName,
key,
@@ -244,7 +246,8 @@ class CachingKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateStore im
}
@Override
- public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
lock.writeLock().lock();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 1d8fb58..94c250c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -38,19 +38,17 @@ public class ChangeLoggingKeyValueBytesStore extends
WrappedStateStore.AbstractS
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
inner.init(context, root);
final String topic =
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
inner.name());
this.changeLogger = new StoreChangeLogger<>(inner.name(), context, new
StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
// if the inner store is an LRU cache, add the eviction listener to
log removed record
if (inner instanceof MemoryLRUCache) {
- ((MemoryLRUCache<Bytes, byte[]>) inner).whenEldestRemoved(new
MemoryLRUCache.EldestEntryRemovalListener<Bytes, byte[]>() {
- @Override
- public void apply(final Bytes key, final byte[] value) {
- // pass null to indicate removal
- changeLogger.logChange(key, null);
- }
+ ((MemoryLRUCache<Bytes, byte[]>) inner).setWhenEldestRemoved((key,
value) -> {
+ // pass null to indicate removal
+ changeLogger.logChange(key, null);
});
}
}
@@ -61,13 +59,15 @@ public class ChangeLoggingKeyValueBytesStore extends
WrappedStateStore.AbstractS
}
@Override
- public void put(final Bytes key, final byte[] value) {
+ public void put(final Bytes key,
+ final byte[] value) {
inner.put(key, value);
changeLogger.logChange(key, value);
}
@Override
- public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] value) {
final byte[] previous = get(key);
if (previous == null) {
put(key, value);
@@ -96,7 +96,8 @@ public class ChangeLoggingKeyValueBytesStore extends
WrappedStateStore.AbstractS
}
@Override
- public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes
to) {
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
return inner.range(from, to);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
deleted file mode 100644
index 4dccd6e..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.List;
-
-public class InMemoryKeyValueLoggedStore<K, V> extends
WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
-
- private final KeyValueStore<K, V> inner;
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
-
- private StoreChangeLogger<K, V> changeLogger;
-
- public InMemoryKeyValueLoggedStore(final KeyValueStore<K, V> inner, final
Serde<K> keySerde, final Serde<V> valueSerde) {
- super(inner);
- this.inner = inner;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(final ProcessorContext context, final StateStore root) {
- inner.init(context, root);
-
- // construct the serde
- final StateSerdes<K, V> serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(),
inner.name()),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
- this.changeLogger = new StoreChangeLogger<>(inner.name(), context,
serdes);
-
- // if the inner store is an LRU cache, add the eviction listener to
log removed record
- if (inner instanceof MemoryLRUCache) {
- ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new
MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(final K key, final V value) {
- removed(key);
- }
- });
- }
- }
-
- @Override
- public long approximateNumEntries() {
- return inner.approximateNumEntries();
- }
-
- @Override
- public V get(final K key) {
- return this.inner.get(key);
- }
-
- @Override
- public void put(final K key, final V value) {
- this.inner.put(key, value);
-
- changeLogger.logChange(key, value);
- }
-
- @Override
- public V putIfAbsent(final K key, final V value) {
- final V originalValue = this.inner.putIfAbsent(key, value);
- if (originalValue == null) {
- changeLogger.logChange(key, value);
- }
- return originalValue;
- }
-
- @Override
- public void putAll(final List<KeyValue<K, V>> entries) {
- this.inner.putAll(entries);
-
- for (final KeyValue<K, V> entry : entries) {
- final K key = entry.key;
- changeLogger.logChange(key, entry.value);
- }
- }
-
- @Override
- public V delete(final K key) {
- final V value = this.inner.delete(key);
-
- removed(key);
-
- return value;
- }
-
- /**
- * Called when the underlying {@link #inner} {@link KeyValueStore} removes
an entry in response to a call from this
- * store.
- *
- * @param key the key for the entry that the inner store removed
- */
- protected void removed(final K key) {
- changeLogger.logChange(key, null);
- }
-
- @Override
- public KeyValueIterator<K, V> range(final K from, final K to) {
- return this.inner.range(from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return this.inner.all();
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 5a6b618..d6dd42a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -52,10 +52,6 @@ public class InMemoryKeyValueStore<K, V> implements
KeyValueStore<K, V> {
this.map = new TreeMap<>();
}
- public KeyValueStore<K, V> enableLogging() {
- return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
- }
-
@Override
public String name() {
return this.name;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
index c790ee9..31169d2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+
import java.util.Objects;
public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V,
KeyValueStore<K, V>> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 80078b7..f0c3c8c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -43,18 +43,17 @@ import java.util.Objects;
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
public interface EldestEntryRemovalListener<K, V> {
-
void apply(K key, V value);
}
- private final Serde<K> keySerde;
+ private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final String name;
protected final Map<K, V> map;
private StateSerdes<K, V> serdes;
- private boolean restoring = false; // TODO: this is a sub-optimal
solution to avoid logging during restoration.
- // in the future we should augment
the StateRestoreCallback with onComplete etc to better resolve this.
+ private boolean restoring = false; // TODO: this is a sub-optimal solution
to avoid logging during restoration.
+ // in the future we should augment the
StateRestoreCallback with onComplete etc to better resolve this.
private volatile boolean open = true;
private EldestEntryRemovalListener<K, V> listener;
@@ -82,14 +81,8 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
};
}
- KeyValueStore<K, V> enableLogging() {
- return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
- }
-
- MemoryLRUCache<K, V> whenEldestRemoved(final EldestEntryRemovalListener<K,
V> listener) {
+ void setWhenEldestRemoved(final EldestEntryRemovalListener<K, V> listener)
{
this.listener = listener;
-
- return this;
}
@Override
@@ -99,7 +92,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K,
V> {
@Override
@SuppressWarnings("unchecked")
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
// construct the serde
this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name),
@@ -137,7 +131,8 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public synchronized void put(final K key, final V value) {
+ public synchronized void put(final K key,
+ final V value) {
Objects.requireNonNull(key);
if (value == null) {
this.map.remove(key);
@@ -147,7 +142,8 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
}
@Override
- public synchronized V putIfAbsent(final K key, final V value) {
+ public synchronized V putIfAbsent(final K key,
+ final V value) {
Objects.requireNonNull(key);
final V originalValue = get(key);
if (originalValue == null) {
@@ -173,7 +169,8 @@ public class MemoryLRUCache<K, V> implements
KeyValueStore<K, V> {
* @throws UnsupportedOperationException at every invocation
*/
@Override
- public KeyValueIterator<K, V> range(final K from, final K to) {
+ public KeyValueIterator<K, V> range(final K from,
+ final K to) {
throw new UnsupportedOperationException("MemoryLRUCache does not
support range() function.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 57458fb..f3d1cae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -82,14 +82,14 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- this.metrics = (StreamsMetricsImpl) context.metrics();
+ metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics";
final Map<String, String> taskTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", "all");
final Map<String, String> storeTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", name());
- this.serdes = new StateSerdes<>(
+ serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
@@ -132,9 +132,9 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
public V get(final K key) {
try {
if (getTime.shouldRecord()) {
- return measureLatency(() ->
outerValue(inner.get(Bytes.wrap(serdes.rawKey(key)))), getTime);
+ return measureLatency(() ->
outerValue(inner.get(keyBytes(key))), getTime);
} else {
- return outerValue(inner.get(Bytes.wrap(serdes.rawKey(key))));
+ return outerValue(inner.get(keyBytes(key)));
}
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
@@ -148,11 +148,11 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
try {
if (putTime.shouldRecord()) {
measureLatency(() -> {
- inner.put(Bytes.wrap(serdes.rawKey(key)),
serdes.rawValue(value));
+ inner.put(keyBytes(key), serdes.rawValue(value));
return null;
}, putTime);
} else {
- inner.put(Bytes.wrap(serdes.rawKey(key)),
serdes.rawValue(value));
+ inner.put(keyBytes(key), serdes.rawValue(value));
}
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
@@ -165,10 +165,10 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
final V value) {
if (putIfAbsentTime.shouldRecord()) {
return measureLatency(
- () ->
outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)),
serdes.rawValue(value))),
+ () -> outerValue(inner.putIfAbsent(keyBytes(key),
serdes.rawValue(value))),
putIfAbsentTime);
} else {
- return
outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)),
serdes.rawValue(value)));
+ return outerValue(inner.putIfAbsent(keyBytes(key),
serdes.rawValue(value)));
}
}
@@ -190,9 +190,9 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
public V delete(final K key) {
try {
if (deleteTime.shouldRecord()) {
- return measureLatency(() ->
outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key)))), deleteTime);
+ return measureLatency(() ->
outerValue(inner.delete(keyBytes(key))), deleteTime);
} else {
- return
outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key))));
+ return outerValue(inner.delete(keyBytes(key)));
}
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
@@ -204,13 +204,13 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
public KeyValueIterator<K, V> range(final K from,
final K to) {
return new MeteredKeyValueIterator(
- this.inner.range(Bytes.wrap(serdes.rawKey(from)),
Bytes.wrap(serdes.rawKey(to))),
- this.rangeTime);
+ inner.range(Bytes.wrap(serdes.rawKey(from)),
Bytes.wrap(serdes.rawKey(to))),
+ rangeTime);
}
@Override
public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
+ return new MeteredKeyValueIterator(inner.all(), allTime);
}
@Override
@@ -245,6 +245,10 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
return value == null ? null : serdes.valueFrom(value);
}
+ private Bytes keyBytes(final K key) {
+ return Bytes.wrap(serdes.rawKey(key));
+ }
+
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K,
V>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, V> entry : from) {
@@ -289,7 +293,7 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
try {
iter.close();
} finally {
- metrics.recordLatency(this.sensor, this.startNs,
time.nanoseconds());
+ metrics.recordLatency(sensor, startNs, time.nanoseconds());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 65ab758..31a039b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -65,12 +65,14 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
//noinspection unchecked
- this.serdes = new
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>)
context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>)
context.valueSerde() : valueSerde);
- this.metrics = (StreamsMetricsImpl) context.metrics();
+ serdes = new StateSerdes<>(
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics";
@@ -88,7 +90,7 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
try {
inner.init(context, root);
} finally {
- this.metrics.recordLatency(
+ metrics.recordLatency(
restoreTime,
startNs,
time.nanoseconds()
@@ -109,13 +111,15 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
final long
latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
final Bytes bytesKey = keyBytes(key);
- return new
MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKey,
-
earliestSessionEndTime,
-
latestSessionStartTime),
- fetchTime,
- metrics,
- serdes,
- time);
+ return new MeteredWindowedKeyValueIterator<>(
+ inner.findSessions(
+ bytesKey,
+ earliestSessionEndTime,
+ latestSessionStartTime),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
}
@Override
@@ -127,14 +131,16 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
Objects.requireNonNull(keyTo, "keyTo cannot be null");
final Bytes bytesKeyFrom = keyBytes(keyFrom);
final Bytes bytesKeyTo = keyBytes(keyTo);
- return new
MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKeyFrom,
-
bytesKeyTo,
-
earliestSessionEndTime,
-
latestSessionStartTime),
- fetchTime,
- metrics,
- serdes,
- time);
+ return new MeteredWindowedKeyValueIterator<>(
+ inner.findSessions(
+ bytesKeyFrom,
+ bytesKeyTo,
+ earliestSessionEndTime,
+ latestSessionStartTime),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
}
@Override
@@ -148,22 +154,23 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
final String message = String.format(e.getMessage(),
sessionKey.key());
throw new ProcessorStateException(message, e);
} finally {
- this.metrics.recordLatency(removeTime, startNs,
time.nanoseconds());
+ metrics.recordLatency(removeTime, startNs, time.nanoseconds());
}
}
@Override
- public void put(final Windowed<K> sessionKey, final V aggregate) {
+ public void put(final Windowed<K> sessionKey,
+ final V aggregate) {
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
final long startNs = time.nanoseconds();
try {
final Bytes key = keyBytes(sessionKey.key());
- this.inner.put(new Windowed<>(key, sessionKey.window()),
serdes.rawValue(aggregate));
+ inner.put(new Windowed<>(key, sessionKey.window()),
serdes.rawValue(aggregate));
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(),
sessionKey.key(), aggregate);
throw new ProcessorStateException(message, e);
} finally {
- this.metrics.recordLatency(this.putTime, startNs,
time.nanoseconds());
+ metrics.recordLatency(putTime, startNs, time.nanoseconds());
}
}
@@ -178,7 +185,8 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
return findSessions(from, to, 0, Long.MAX_VALUE);
@@ -188,9 +196,9 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
public void flush() {
final long startNs = time.nanoseconds();
try {
- this.inner.flush();
+ inner.flush();
} finally {
- this.metrics.recordLatency(this.flushTime, startNs,
time.nanoseconds());
+ metrics.recordLatency(flushTime, startNs, time.nanoseconds());
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index fefa772..166c300 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -66,12 +66,14 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
@SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context,
+ final StateStore root) {
this.context = context;
- this.serdes = new
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
- keySerde == null ? (Serde<K>)
context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>)
context.valueSerde() : valueSerde);
- this.metrics = (StreamsMetricsImpl) context.metrics();
+ serdes = new StateSerdes<>(
+ ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
+ keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+ valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+ metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics";
@@ -88,7 +90,7 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
try {
inner.init(context, root);
} finally {
- this.metrics.recordLatency(
+ metrics.recordLatency(
restoreTime,
startNs,
time.nanoseconds()
@@ -103,12 +105,15 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public void put(final K key, final V value) {
+ public void put(final K key,
+ final V value) {
put(key, value, context.timestamp());
}
@Override
- public void put(final K key, final V value, final long
windowStartTimestamp) {
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
final long startNs = time.nanoseconds();
try {
inner.put(keyBytes(key), serdes.rawValue(value),
windowStartTimestamp);
@@ -116,7 +121,7 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
final String message = String.format(e.getMessage(), key, value);
throw new ProcessorStateException(message, e);
} finally {
- metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+ metrics.recordLatency(putTime, startNs, time.nanoseconds());
}
}
@@ -125,7 +130,8 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
@Override
- public V fetch(final K key, final long timestamp) {
+ public V fetch(final K key,
+ final long timestamp) {
final long startNs = time.nanoseconds();
try {
final byte[] result = inner.fetch(keyBytes(key), timestamp);
@@ -134,13 +140,15 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
}
return serdes.valueFrom(result);
} finally {
- metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
+ metrics.recordLatency(fetchTime, startNs, time.nanoseconds());
}
}
@SuppressWarnings("deprecation")
@Override
- public WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key),
timeFrom, timeTo),
fetchTime,
metrics,
@@ -155,22 +163,28 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
@SuppressWarnings("deprecation")
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
- return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom,
timeTo),
- fetchTime,
- metrics,
- serdes,
- time);
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return new MeteredWindowedKeyValueIterator<>(
+ inner.fetchAll(timeFrom, timeTo),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
}
@SuppressWarnings("deprecation")
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final long timeFrom, final long timeTo) {
- return new
MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to),
timeFrom, timeTo),
- fetchTime,
- metrics,
- serdes,
- time);
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
+ return new MeteredWindowedKeyValueIterator<>(
+ inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
+ fetchTime,
+ metrics,
+ serdes,
+ time);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 4270d27..2bfecd2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -21,9 +21,10 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import static java.util.Collections.singletonList;
+
/**
* A wrapper over all of the {@link StateStoreProvider}s in a Topology
*/
@@ -47,10 +48,11 @@ public class QueryableStoreProvider {
* @param <T> The expected type of the returned store
* @return A composite object that wraps the store instances.
*/
- public <T> T getStore(final String storeName, final QueryableStoreType<T>
queryableStoreType) {
+ public <T> T getStore(final String storeName,
+ final QueryableStoreType<T> queryableStoreType) {
final List<T> globalStore = globalStoreProvider.stores(storeName,
queryableStoreType);
if (!globalStore.isEmpty()) {
- return queryableStoreType.create(new
WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)),
storeName);
+ return queryableStoreType.create(new
WrappingStoreProvider(singletonList(globalStoreProvider)), storeName);
}
final List<T> allStores = new ArrayList<>();
for (final StateStoreProvider storeProvider : storeProviders) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e19bb6d..35fb5bb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -103,7 +103,8 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
this(name, DB_FILE_DIR);
}
- RocksDBStore(final String name, final String parentDir) {
+ RocksDBStore(final String name,
+ final String parentDir) {
this.name = name;
this.parentDir = parentDir;
}
@@ -222,7 +223,6 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
}
void toggleDbForBulkLoading(final boolean prepareForBulkload) {
-
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the
num.levels check
// for bulk loading
@@ -434,7 +434,10 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
}
}
- private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes,
byte[]>> implements KeyValueIterator<Bytes, byte[]> {
+ private class RocksDbIterator
+ extends AbstractIterator<KeyValue<Bytes, byte[]>>
+ implements KeyValueIterator<Bytes, byte[]> {
+
private final String storeName;
private final RocksIterator iter;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 98555ad..74134d6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -39,11 +39,16 @@ class StoreChangeLogger<K, V> {
private final ProcessorContext context;
private final RecordCollector collector;
- StoreChangeLogger(final String storeName, final ProcessorContext context,
final StateSerdes<K, V> serialization) {
+ StoreChangeLogger(final String storeName,
+ final ProcessorContext context,
+ final StateSerdes<K, V> serialization) {
this(storeName, context, context.taskId().partition, serialization);
}
- private StoreChangeLogger(final String storeName, final ProcessorContext
context, final int partition, final StateSerdes<K, V> serialization) {
+ private StoreChangeLogger(final String storeName,
+ final ProcessorContext context,
+ final int partition,
+ final StateSerdes<K, V> serialization) {
this.topic =
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
this.context = context;
this.partition = partition;
@@ -51,7 +56,8 @@ class StoreChangeLogger<K, V> {
this.collector = ((RecordCollector.Supplier)
context).recordCollector();
}
- void logChange(final K key, final V value) {
+ void logChange(final K key,
+ final V value) {
if (collector != null) {
final Serializer<K> keySerializer = serialization.keySerializer();
final Serializer<V> valueSerializer =
serialization.valueSerializer();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index 31d063a..058a249 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-
public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V,
WindowStore<K, V>> {
private final WindowBytesStoreSupplier storeSupplier;
@@ -37,22 +36,24 @@ public class WindowStoreBuilder<K, V> extends
AbstractStoreBuilder<K, V, WindowS
@Override
public WindowStore<K, V> build() {
- return new
MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
- storeSupplier.metricsScope(),
- time,
- keySerde,
- valueSerde);
+ return new MeteredWindowStore<>(
+ maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ storeSupplier.metricsScope(),
+ time,
+ keySerde,
+ valueSerde);
}
private WindowStore<Bytes, byte[]> maybeWrapCaching(final
WindowStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
- return new CachingWindowStore<>(inner,
- keySerde,
- valueSerde,
- storeSupplier.windowSize(),
- storeSupplier.segmentIntervalMs());
+ return new CachingWindowStore<>(
+ inner,
+ keySerde,
+ valueSerde,
+ storeSupplier.windowSize(),
+ storeSupplier.segmentIntervalMs());
}
private WindowStore<Bytes, byte[]> maybeWrapLogging(final
WindowStore<Bytes, byte[]> inner) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 6de39cc..d94e7cc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -30,7 +30,7 @@ public class WrappingStoreProvider implements
StateStoreProvider {
private final List<StateStoreProvider> storeProviders;
- public WrappingStoreProvider(final List<StateStoreProvider>
storeProviders) {
+ WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
this.storeProviders = storeProviders;
}
@@ -42,11 +42,11 @@ public class WrappingStoreProvider implements
StateStoreProvider {
* @param <T> The type of the Store, for example, {@link
org.apache.kafka.streams.state.ReadOnlyKeyValueStore}
* @return a List of all the stores with the storeName and are accepted
by {@link QueryableStoreType#accepts(StateStore)}
*/
- public <T> List<T> stores(final String storeName, final
QueryableStoreType<T> type) {
+ public <T> List<T> stores(final String storeName,
+ final QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (final StateStoreProvider provider : storeProviders) {
- final List<T> stores =
- provider.stores(storeName, type);
+ final List<T> stores = provider.stores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index d431dbe..17d403f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -33,8 +33,8 @@ import java.util.TreeSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
-import static org.hamcrest.core.IsCollectionContaining.hasItems;
+import static org.hamcrest.core.IsIterableContaining.hasItem;
+import static org.hamcrest.core.IsIterableContaining.hasItems;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertTrue;