This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9752cca KAFKA-6729: Follow up; disable logging for source KTable.
(#5038)
9752cca is described below
commit 9752ccad552543964c2ba92a152cb67636233e13
Author: Guozhang Wang <[email protected]>
AuthorDate: Sun May 20 10:24:07 2018 -0700
KAFKA-6729: Follow up; disable logging for source KTable. (#5038)
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/kstream/internals/InternalStreamsBuilder.java | 3 +++
.../kafka/streams/processor/internals/InternalTopologyBuilder.java | 6 +++++-
.../src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java | 6 +++++-
.../streams/processor/internals/StreamsPartitionAssignorTest.java | 1 -
4 files changed, 13 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 480794c..0a19b4e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,6 +72,9 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>> materialized) {
+ // explicitly disable logging for source table materialized stores
+ materialized.withLoggingDisabled();
+
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new
KeyValueStoreMaterializer<>(materialized)
.materialize();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 575ac01..1651bbd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -120,7 +120,7 @@ public class InternalTopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
- interface StateStoreFactory {
+ public interface StateStoreFactory {
Set<String> users();
boolean loggingEnabled();
StateStore build();
@@ -1799,4 +1799,8 @@ public class InternalTopologyBuilder {
public synchronized Set<String> getSourceTopicNames() {
return sourceTopicNames;
}
+
+ public synchronized Map<String, StateStoreFactory> getStateStores() {
+ return stateFactories;
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0a1e6df..37101de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -279,7 +279,11 @@ public class StreamsBuilderTest {
final InternalTopologyBuilder internalTopologyBuilder =
TopologyWrapper.getInternalTopologyBuilder(builder.build());
-
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("topic")));
+ assertThat(internalTopologyBuilder.getStateStores().keySet(),
equalTo(Collections.singleton("store")));
+
+
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(false));
+
+
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
equalTo(true));
}
@Test(expected = TopologyException.class)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index cc507d6..37b03fa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -796,7 +796,6 @@ public class StreamsPartitionAssignorTest {
expectedCreatedInternalTopics.put(applicationId +
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
expectedCreatedInternalTopics.put(applicationId +
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
expectedCreatedInternalTopics.put(applicationId +
"-KSTREAM-MAP-0000000001-repartition", 4);
- expectedCreatedInternalTopics.put("topic3", 4); // the source
topic is reused as changelog topics
// check if all internal topics were created as expected
assertThat(mockInternalTopicManager.readyTopics,
equalTo(expectedCreatedInternalTopics));
--
To stop receiving notification emails like this one, please contact
[email protected].