This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9752cca  KAFKA-6729: Follow up; disable logging for source KTable. 
(#5038)
9752cca is described below

commit 9752ccad552543964c2ba92a152cb67636233e13
Author: Guozhang Wang <[email protected]>
AuthorDate: Sun May 20 10:24:07 2018 -0700

    KAFKA-6729: Follow up; disable logging for source KTable. (#5038)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/kstream/internals/InternalStreamsBuilder.java     | 3 +++
 .../kafka/streams/processor/internals/InternalTopologyBuilder.java  | 6 +++++-
 .../src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java  | 6 +++++-
 .../streams/processor/internals/StreamsPartitionAssignorTest.java   | 1 -
 4 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 480794c..0a19b4e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,6 +72,9 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        // explicitly disable logging for source table materialized stores
+        materialized.withLoggingDisabled();
+
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new 
KeyValueStoreMaterializer<>(materialized)
                 .materialize();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 575ac01..1651bbd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -120,7 +120,7 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    interface StateStoreFactory {
+    public interface StateStoreFactory {
         Set<String> users();
         boolean loggingEnabled();
         StateStore build();
@@ -1799,4 +1799,8 @@ public class InternalTopologyBuilder {
     public synchronized Set<String> getSourceTopicNames() {
         return sourceTopicNames;
     }
+
+    public synchronized Map<String, StateStoreFactory> getStateStores() {
+        return stateFactories;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0a1e6df..37101de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -279,7 +279,11 @@ public class StreamsBuilderTest {
 
         final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
 
-        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
 equalTo(Collections.singleton("topic")));
+        assertThat(internalTopologyBuilder.getStateStores().keySet(), 
equalTo(Collections.singleton("store")));
+
+        
assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
 equalTo(false));
+
+        
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
 equalTo(true));
     }
     
     @Test(expected = TopologyException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index cc507d6..37b03fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -796,7 +796,6 @@ public class StreamsPartitionAssignorTest {
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
         expectedCreatedInternalTopics.put(applicationId + 
"-KSTREAM-MAP-0000000001-repartition", 4);
-        expectedCreatedInternalTopics.put("topic3", 4);     // the source 
topic is reused as changelog topics
 
         // check if all internal topics were created as expected
         assertThat(mockInternalTopicManager.readyTopics, 
equalTo(expectedCreatedInternalTopics));

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to