[ https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584145#comment-16584145 ]
ASF GitHub Bot commented on KAFKA-6998: --------------------------------------- guozhangwang closed pull request #5488: KAFKA-6998: Disable Caching when max.cache.bytes are zero. URL: https://github.com/apache/kafka/pull/5488 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9071c9db2bf..33e037c3ced 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1749,7 +1749,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time if (!parts.isEmpty()) return parts; - Timer timer = time.timer(requestTimeoutMs); + Timer timer = time.timer(timeout); Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata( new MetadataRequest.Builder(Collections.singletonList(topic), true), timer); return topicMetadata.get(topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2cefb35905c..82323d9081d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -635,10 +635,6 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, this.config = config; this.time = time; - // adjust the topology if optimization is turned on. - // TODO: to be removed post 2.0 - internalTopologyBuilder.adjust(config); - // The application ID is a required config and hence should always have value processId = UUID.randomUUID(); final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); @@ -667,7 +663,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, reporters.add(new JmxReporter(JMX_PREFIX)); metrics = new Metrics(metricConfig, reporters, time); - internalTopologyBuilder.setApplicationId(applicationId); + // re-write the physical topology according to the config + internalTopologyBuilder.rewriteTopology(config); // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception internalTopologyBuilder.build(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 99d616f60e4..edff470a41e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -66,7 +66,10 @@ // state factories private final Map<String, StateStoreFactory> stateFactories = new HashMap<>(); - // global state factories + // built global state stores + private final Map<String, StoreBuilder> globalStateBuilders = new LinkedHashMap<>(); + + // built global state stores private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>(); // all topics subscribed from source processors (without application-id prefix for internal topics) @@ -326,6 +329,7 @@ Sink describe() { } } + // public for testing only public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) { Objects.requireNonNull(applicationId, "applicationId can't be null"); this.applicationId = applicationId; @@ -333,6 +337,35 @@ public synchronized final InternalTopologyBuilder setApplicationId(final String return this; } + public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsConfig config) { + Objects.requireNonNull(config, "config can't be null"); + + // set application id + setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); + + // maybe strip out caching layers + if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 0L) { + for (final StateStoreFactory storeFactory : stateFactories.values()) { + storeFactory.builder.withCachingDisabled(); + } + + for (final StoreBuilder storeBuilder : globalStateBuilders.values()) { + storeBuilder.withCachingDisabled(); + } + } + + // build global state stores + for (final StoreBuilder storeBuilder : globalStateBuilders.values()) { + globalStateStores.put(storeBuilder.name(), storeBuilder.build()); + } + + // adjust the topology if optimization is turned on. + // TODO: to be removed post 2.0 + adjust(config); + + return this; + } + public final void addSource(final Topology.AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -524,7 +557,7 @@ public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, processorName, stateUpdateSupplier, storeBuilder.name(), - storeBuilder.build()); + storeBuilder); } private void validateTopicNotAlreadyRegistered(final String topic) { @@ -552,8 +585,8 @@ public final void connectProcessorAndStateStores(final String processorName, } } - public final void connectSourceStoreAndTopic(final String sourceStoreName, - final String topic) { + private void connectSourceStoreAndTopic(final String sourceStoreName, + final String topic) { if (storeToChangelogTopic.containsKey(sourceStoreName)) { throw new TopologyException("Source store " + sourceStoreName + " is already added."); } @@ -593,7 +626,7 @@ private void validateGlobalStoreArguments(final String sourceName, if (nodeFactories.containsKey(processorName)) { throw new TopologyException("Processor " + processorName + " is already added."); } - if (stateFactories.containsKey(storeName) || globalStateStores.containsKey(storeName)) { + if (stateFactories.containsKey(storeName) || globalStateBuilders.containsKey(storeName)) { throw new TopologyException("StateStore " + storeName + " is already added."); } if (loggingEnabled) { @@ -612,7 +645,7 @@ private void addGlobalStore(final String sourceName, final String processorName, final ProcessorSupplier stateUpdateSupplier, final String name, - final KeyValueStore store) { + final StoreBuilder<KeyValueStore> storeBuilder) { final String[] topics = {topic}; final String[] predecessors = {sourceName}; final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, @@ -631,13 +664,13 @@ private void addGlobalStore(final String sourceName, nodeFactories.put(processorName, nodeFactory); nodeGrouper.add(processorName); nodeGrouper.unite(processorName, predecessors); - globalStateStores.put(name, store); + globalStateBuilders.put(name, storeBuilder); connectSourceStoreAndTopic(name, topic); } private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) { - if (globalStateStores.containsKey(stateStoreName)) { + if (globalStateBuilders.containsKey(stateStoreName)) { throw new TopologyException("Global StateStore " + stateStoreName + " can be used by a Processor without being specified; it should not be explicitly passed."); } @@ -804,6 +837,8 @@ public synchronized ProcessorTopology build(final Integer topicGroupId) { * @return ProcessorTopology */ public synchronized ProcessorTopology buildGlobalStateTopology() { + Objects.requireNonNull(applicationId, "topology has not completed optimization"); + final Set<String> globalGroups = globalNodeGroups(); if (globalGroups.isEmpty()) { return null; @@ -825,6 +860,8 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { } private ProcessorTopology build(final Set<String> nodeGroup) { + Objects.requireNonNull(applicationId, "topology has not completed optimization"); + final Map<String, ProcessorNode> processorMap = new LinkedHashMap<>(); final Map<String, SourceNode> topicSourceMap = new HashMap<>(); final Map<String, SinkNode> topicSinkMap = new HashMap<>(); @@ -950,10 +987,14 @@ private void buildProcessorNode(final Map<String, ProcessorNode> processorMap, * @return map containing all global {@link StateStore}s */ public Map<String, StateStore> globalStateStores() { + Objects.requireNonNull(applicationId, "topology has not completed optimization"); + return Collections.unmodifiableMap(globalStateStores); } public Set<String> allStateStoreName() { + Objects.requireNonNull(applicationId, "topology has not completed optimization"); + final Set<String> allNames = new HashSet<>(stateFactories.keySet()); allNames.addAll(globalStateStores.keySet()); return Collections.unmodifiableSet(allNames); @@ -1036,7 +1077,7 @@ private void buildProcessorNode(final Map<String, ProcessorNode> processorMap, // Adjust the generated topology based on the configs. // Not exposed as public API and should be removed post 2.0 - public void adjust(final StreamsConfig config) { + private void adjust(final StreamsConfig config) { final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE); if (enableOptimization20) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java index 2d1b241e820..a930468d0f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java @@ -32,6 +32,12 @@ */ StoreBuilder<T> withCachingEnabled(); + /** + * Disable caching on the store. + * @return this + */ + StoreBuilder<T> withCachingDisabled(); + /** * Maintain a changelog for any changes made to the store. * Use the provided config to set the config of the changelog topic. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java index fdcd2e72d64..898db9e482c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java @@ -52,6 +52,12 @@ public AbstractStoreBuilder(final String name, return this; } + @Override + public StoreBuilder<T> withCachingDisabled() { + enableCaching = false; + return this; + } + @Override public StoreBuilder<T> withLoggingEnabled(final Map<String, String> config) { Objects.requireNonNull(config, "config can't be null"); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 28e3d552c26..481c8602c86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -81,11 +81,12 @@ @Before public void before() { props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); + props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streams = new KafkaStreams(builder.build(), props); } @@ -238,10 +239,10 @@ public boolean conditionMet() { @Test public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() { final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1"); - props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1"); + props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 200); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 56e6a6dc1c9..4fab4ff87f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -53,7 +53,7 @@ public class StreamsBuilderTest { private final StreamsBuilder builder = new StreamsBuilder(); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Test public void shouldAllowJoinUnmaterializedFilteredKTable() { @@ -61,7 +61,7 @@ public void shouldAllowJoinUnmaterializedFilteredKTable() { builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(1)); assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); @@ -75,7 +75,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() { builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(2)); assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store"))); @@ -88,7 +88,7 @@ public void shouldAllowJoinUnmaterializedMapValuedKTable() { builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(1)); assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); @@ -102,7 +102,7 @@ public void shouldAllowJoinMaterializedMapValuedKTable() { builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(2)); assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store"))); @@ -116,7 +116,7 @@ public void shouldAllowJoinUnmaterializedJoinedKTable() { builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(2)); assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name()))); @@ -130,7 +130,7 @@ public void shouldAllowJoinMaterializedJoinedKTable() { builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(3)); assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store"))); @@ -143,7 +143,7 @@ public void shouldAllowJoinMaterializedSourceKTable() { builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER); builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); assertThat(topology.stateStores().size(), equalTo(1)); assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); @@ -284,11 +284,11 @@ public void shouldReuseSourceTopicAsChangelogsWithOptimization20() { final String topic = "topic"; builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")); final Topology topology = builder.build(); - final Properties props = StreamsTestUtils.minimalStreamsConfig(); + final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); - internalTopologyBuilder.adjust(new StreamsConfig(props)); + internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)); assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store", "topic"))); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 1791672ec4c..83279cc4ddc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -49,7 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; -import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig; +import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -523,7 +523,7 @@ public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() { @Test public void shouldUseNewConfigsWhenPresent() { - final Properties props = minimalStreamsConfig(); + final Properties props = getStreamsConfig(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); @@ -536,7 +536,7 @@ public void shouldUseNewConfigsWhenPresent() { @Test public void shouldUseCorrectDefaultsWhenNoneSpecified() { - final StreamsConfig config = new StreamsConfig(minimalStreamsConfig()); + final StreamsConfig config = new StreamsConfig(getStreamsConfig()); assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde); assertTrue(config.defaultValueSerde() instanceof Serdes.ByteArraySerde); assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp); @@ -544,7 +544,7 @@ public void shouldUseCorrectDefaultsWhenNoneSpecified() { @Test public void shouldSpecifyCorrectKeySerdeClassOnError() { - final Properties props = minimalStreamsConfig(); + final Properties props = getStreamsConfig(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig config = new StreamsConfig(props); try { @@ -558,7 +558,7 @@ public void shouldSpecifyCorrectKeySerdeClassOnError() { @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectValueSerdeClassOnError() { - final Properties props = minimalStreamsConfig(); + final Properties props = getStreamsConfig(); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig config = new StreamsConfig(props); try { diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java index 940ec0ec70b..e1c7c1170ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.test.StreamsTestUtils; /** * This class allows to access the {@link InternalTopologyBuilder} a {@link Topology} object. @@ -25,14 +26,14 @@ public class TopologyWrapper extends Topology { static public InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) { - return topology.internalTopologyBuilder; + return topology.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())); } public InternalTopologyBuilder getInternalBuilder() { - return internalTopologyBuilder; + return internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())); } - public void setApplicationId(final String applicationId) { - internalTopologyBuilder.setApplicationId(applicationId); + public InternalTopologyBuilder getInternalBuilder(final String applicationId) { + return internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(applicationId))); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 770f579ad98..9aa07729c20 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -163,8 +163,13 @@ private void runSimpleCopyTest(final int numberOfRestarts, Serdes.LongSerde.class.getName(), new Properties() { { - put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); + put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } })); @@ -248,6 +253,10 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception { new Properties() { { put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); + put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } })); @@ -669,6 +678,8 @@ public void close() { } put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads); put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, -1); + put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000); put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1); put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index 08ab1200a94..ac5a41837fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -140,6 +140,9 @@ public static void startKafkaCluster() throws InterruptedException { public void setUp() throws IOException { final Properties props = new Properties(); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); @@ -277,6 +280,9 @@ public void shouldThrowExceptionOverlappingTopic() { @Test public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException { final Properties props = new Properties(); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); final Properties localConfig = StreamsTestUtils.getStreamsConfig( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index fe7ee266657..153c5a1c508 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -35,9 +36,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; @@ -89,10 +88,10 @@ public void before() { streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } @After @@ -147,14 +146,9 @@ public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); - textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(final String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }) - .groupBy(MockMapper.<String, String>selectValueMapper()) - .count(Materialized.<String, Long, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("Counts")); + textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) + .groupBy(MockMapper.selectValueMapper()) + .count(Materialized.as("Counts")); final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp); streams.start(); @@ -191,19 +185,10 @@ public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Except final int durationMs = 2000; - textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(final String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }) - .groupBy(MockMapper.<String, String>selectValueMapper()) + textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) + .groupBy(MockMapper.selectValueMapper()) .windowedBy(TimeWindows.of(1000).grace(0L)) - .count( - Materialized - .<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows") - .withRetention(2_000L) - ); + .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("CountWindows").withRetention(2_000L)); final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp); streams.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index 29f077fc52c..2269a5dd88a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -151,6 +151,7 @@ public void setup() { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); @@ -158,7 +159,7 @@ public void setup() { streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), purgeIntervalMs); streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), purgeSegmentBytes); streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), purgeSegmentBytes / 2); // we cannot allow batch size larger than segment size - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index d3a7aee6523..7cfde6122c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -18,6 +18,7 @@ import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -29,6 +30,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -115,7 +117,12 @@ public void setUp() throws Exception { CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC); final Properties properties = new Properties(); + properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java index 200062e0fb9..58d903a50ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java @@ -165,10 +165,10 @@ private void runIntegrationTest(final String optimizationConfig, streams.start(); final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L)); - final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size()); + final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues); final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6")); - final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues.size()); + final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues); assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues)); assertThat(receivedCountStringKeyValues, equalTo(expectedStringCountKeyValues)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index c1d07dcdcca..5eb4fc7e5b4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -106,6 +106,7 @@ private Properties props(final String applicationId) { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java index 6af333121d0..c43fcd80e93 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java @@ -99,7 +99,7 @@ public void shouldInnerJoinWithStream() { private void verifyJoin(final Map<String, String> expected) { final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { // write some data to the global table diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index c6c4996846d..2bf6971ff7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; @@ -33,6 +34,7 @@ import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; import java.util.Collections; @@ -60,7 +62,6 @@ private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store")); { - builder.internalTopologyBuilder.setApplicationId(APP_ID); materialized.generateStoreNameIfNeeded(builder, storePrefix); } @@ -132,7 +133,9 @@ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() { final KTable table1 = builder.table("topic2", consumed, materializedInternal); builder.buildAndOptimizeTopology(); - final ProcessorTopology topology = builder.internalTopologyBuilder.build(null); + final ProcessorTopology topology = builder.internalTopologyBuilder + .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))) + .build(null); assertEquals(1, topology.stateStores().size()); final String storeName = "prefix-STATE-STORE-0000000000"; @@ -174,7 +177,9 @@ public void shouldBuildSimpleGlobalTableTopology() { materializedInternal); builder.buildAndOptimizeTopology(); - final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology(); + final ProcessorTopology topology = builder.internalTopologyBuilder + .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))) + .buildGlobalStateTopology(); final List<StateStore> stateStores = topology.globalStateStores(); assertEquals(1, stateStores.size()); @@ -182,7 +187,9 @@ public void shouldBuildSimpleGlobalTableTopology() { } private void doBuildGlobalTopologyWithAllGlobalTables() { - final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology(); + final ProcessorTopology topology = builder.internalTopologyBuilder + .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))) + .buildGlobalStateTopology(); final List<StateStore> stateStores = topology.globalStateStores(); final Set<String> sourceTopics = topology.sourceTopics(); @@ -263,8 +270,9 @@ public void shouldMapStateStoresToCorrectSourceTopics() { final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper()); - mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")); + mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.as("count")); builder.buildAndOptimizeTopology(); + builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))); assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store")); assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count")); } @@ -359,6 +367,7 @@ public void shouldAddRegexTopicToLatestAutoOffsetResetList() { public void shouldHaveNullTimestampExtractorWhenNoneSupplied() { builder.stream(Collections.singleton("topic"), consumed); builder.buildAndOptimizeTopology(); + builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))); final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null); assertNull(processorTopology.source("topic").getTimestampExtractor()); } @@ -368,7 +377,9 @@ public void shouldUseProvidedTimestampExtractor() { final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor())); builder.stream(Collections.singleton("topic"), consumed); builder.buildAndOptimizeTopology(); - final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null); + final ProcessorTopology processorTopology = builder.internalTopologyBuilder + .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))) + .build(null); assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } @@ -376,7 +387,9 @@ public void shouldUseProvidedTimestampExtractor() { public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() { builder.table("topic", consumed, materialized); builder.buildAndOptimizeTopology(); - final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null); + final ProcessorTopology processorTopology = builder.internalTopologyBuilder + .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))) + .build(null); assertNull(processorTopology.source("topic").getTimestampExtractor()); } @@ -385,7 +398,9 @@ public void ktableShouldUseProvidedTimestampExtractor() { final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor())); builder.table("topic", consumed, materialized); builder.buildAndOptimizeTopology(); - final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null); + final ProcessorTopology processorTopology = builder.internalTopologyBuilder + .rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig(APP_ID))) + .build(null); assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 1517f0e9b6b..a1f8b27f4d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -75,7 +75,7 @@ private KGroupedStream<String, String> groupedStream; private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Before public void before() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index 31863f28e89..662ede7d330 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -56,7 +56,7 @@ private final StreamsBuilder builder = new StreamsBuilder(); private static final String INVALID_STORE_NAME = "~foo bar~"; private KGroupedTable<String, String> groupedTable; - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer()); private final String topic = "input"; @Before diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index b7035aa16c5..b9773469350 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -39,7 +39,7 @@ private final String topicName = "topic"; private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @SuppressWarnings("unchecked") @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index 6306d2e0cd7..d7e62406e6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -37,7 +37,7 @@ private final String topicName = "topic"; private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); private final Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index 8daad991a68..7feb18ffb23 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -39,7 +39,7 @@ private String topicName = "topic"; private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testFlatMap() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java index aca091136ff..0c4495e0a1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -38,7 +38,7 @@ private String topicName = "topic"; private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testFlatMapValues() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java index fbcd6db691c..feed2fbaa65 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java @@ -41,7 +41,7 @@ private final String topicName = "topic"; private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testForeach() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java index d5c5a54c0b4..a68583ec045 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java @@ -74,7 +74,7 @@ public String apply(final Integer key, final String value) { }; stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier); - final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); driver = new TopologyTestDriver(builder.build(), props); processor = supplier.theCapturedProcessor(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java index 248c3eed138..aebc2a53cd6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java @@ -76,7 +76,7 @@ public String apply(final Integer key, final String value) { }; stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier); - final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); driver = new TopologyTestDriver(builder.build(), props); processor = supplier.theCapturedProcessor(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 354fa0a147d..bce7fc80a40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -74,7 +74,7 @@ private StreamsBuilder builder; private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Before public void before() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 963b681472b..971ee62d284 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -53,7 +53,7 @@ private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Test public void shouldLogAndMeterOnSkippedRecordsWithNullValue() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 338b96aa98a..856de3d85c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -48,7 +48,7 @@ private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Test public void testLeftJoin() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index 6ffce04c837..eb6536d37a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -72,7 +72,7 @@ public void setUp() { table = builder.table(tableTopic, consumed); stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier); - final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); driver = new TopologyTestDriver(builder.build(), props, 0L); processor = supplier.theCapturedProcessor(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 1c3e027ff9f..8c57038b042 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -68,7 +68,7 @@ public void setUp() { table = builder.table(tableTopic, consumed); stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier); - final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); driver = new TopologyTestDriver(builder.build(), props, 0L); processor = supplier.theCapturedProcessor(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index e1851c1c04a..2692c17f021 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -38,7 +38,7 @@ private String topicName = "topic"; private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testMap() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java index 8de8a81a435..67891be18a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -39,7 +39,7 @@ private String topicName = "topic"; private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>(); private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testFlatMapValues() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java index 3e4012e77af..780bcae21d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java @@ -40,7 +40,7 @@ private final String topicName = "topic"; private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void shouldObserveStreamElements() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java index 62f767746ea..5f804d45cf4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java @@ -41,7 +41,7 @@ private String topicName = "topic_key_select"; private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(topicName, new StringSerializer(), new IntegerSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer()); @Test public void testSelectKey() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 74cd7bdb4eb..c7fd7cdc32a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -64,24 +64,9 @@ private static final long GAP_MS = 5 * 60 * 1000L; private static final String STORE_NAME = "session-store"; - private final Initializer<Long> initializer = new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }; - private final Aggregator<String, String, Long> aggregator = new Aggregator<String, String, Long>() { - @Override - public Long apply(final String aggKey, final String value, final Long aggregate) { - return aggregate + 1; - } - }; - private final Merger<String, Long> sessionMerger = new Merger<String, Long>() { - @Override - public Long apply(final String aggKey, final Long aggOne, final Long aggTwo) { - return aggOne + aggTwo; - } - }; + private final Initializer<Long> initializer = () -> 0L; + private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1; + private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo; private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = new KStreamSessionWindowAggregate<>( SessionWindows.with(GAP_MS), @@ -96,7 +81,6 @@ public Long apply(final String aggKey, final Long aggOne, final Long aggTwo) { private InternalMockProcessorContext context; private Metrics metrics; - @Before public void initializeStore() { final File stateDir = TestUtils.tempDirectory(); @@ -107,7 +91,7 @@ public void initializeStore() { Serdes.String(), Serdes.String(), metrics, - new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), + new StreamsConfig(StreamsTestUtils.getStreamsConfig()), NoOpRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000, metrics) ) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index 9a61874abbe..a8ee681cb8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -44,7 +44,7 @@ private String topicName = "topic"; private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer()); @Rule public final KStreamTestDriver kstreamDriver = new KStreamTestDriver(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index c3996507055..570053c0c85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -51,7 +51,7 @@ private String topicName = "topic"; private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>(); private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer()); @Mock(MockType.NICE) private ProcessorContext context; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index af7cff6d17a..5a295b8a50a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -60,7 +60,7 @@ public class KStreamWindowAggregateTest { private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Test public void testAggBasic() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java index bc8ca95187a..3746ae9a322 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java @@ -48,7 +48,7 @@ public class KStreamWindowReduceTest { - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 2995b234c96..3e143f5807b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -49,7 +49,7 @@ private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer()); private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer()); private void doTestKTable(final StreamsBuilder builder, final KTable<String, Integer> table2, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 60d7426224f..eb586e60de1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -64,7 +64,7 @@ private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); private StreamsBuilder builder; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index bf5660a7f11..ce7de16501c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -40,7 +40,7 @@ public class KTableMapKeysTest { private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testMapKeysConvertingToStream() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index ddfd5a5c941..0f56043be50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -51,7 +51,7 @@ private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String()); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) { try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 80a60ab2f20..2055f9cf319 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -48,7 +48,7 @@ private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String()); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @Test public void testKTable() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 825edb3eb37..34a235ac482 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -55,7 +55,7 @@ private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final Merger<String, String> sessionMerger = new Merger<String, String>() { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 7b885b23bf2..0e541c91acc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -54,7 +54,7 @@ private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private TimeWindowedKStream<String, String> windowedStream; @Before diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 54a927c8072..c4699ece515 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -37,7 +37,7 @@ import java.util.Properties; -import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig; +import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -182,7 +182,7 @@ public void appConfigsShouldReturnUnrecognizedValues() { private static class TestProcessorContext extends AbstractProcessorContext { static Properties config; static { - config = minimalStreamsConfig(); + config = getStreamsConfig(); // Value must be a string to test className -> class conversion config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetter.class.getName()); config.put("user.supplied.config", "user-suppplied-value"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 95c6943088c..37a6fdba453 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -104,7 +104,7 @@ public String newStoreName(final String prefix) { properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah"); properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); config = new StreamsConfig(properties); - globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), + globalStreamThread = new GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(), config, mockConsumer, new StateDirectory(config, time), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 78c217d236e..7230e5fcbce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; @@ -32,6 +33,7 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStoreBuilder; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; import java.lang.reflect.Field; @@ -626,7 +628,7 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception @Test public void shouldAddTimestampExtractorPerSource() { builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic"); - final ProcessorTopology processorTopology = builder.build(null); + final ProcessorTopology processorTopology = builder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())).build(null); assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } @@ -634,7 +636,7 @@ public void shouldAddTimestampExtractorPerSource() { public void shouldAddTimestampExtractorWithPatternPerSource() { final Pattern pattern = Pattern.compile("t.*"); builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern); - final ProcessorTopology processorTopology = builder.build(null); + final ProcessorTopology processorTopology = builder.rewriteTopology(new StreamsConfig(StreamsTestUtils.getStreamsConfig())).build(null); assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 3495e80eef6..587cae20179 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -102,8 +102,6 @@ public void cleanup() { @Test public void testTopologyMetadata() { - topology.setApplicationId("X"); - topology.addSource("source-1", "topic-1"); topology.addSource("source-2", "topic-2", "topic-3"); topology.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1"); @@ -111,7 +109,7 @@ public void testTopologyMetadata() { topology.addSink("sink-1", "topic-3", "processor-1"); topology.addSink("sink-2", "topic-4", "processor-1", "processor-2"); - final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); + final ProcessorTopology processorTopology = topology.getInternalBuilder("X").build(); assertEquals(6, processorTopology.processors().size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index 0cf147cec2b..5b1da16c3cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -53,7 +53,7 @@ public class StreamsMetadataStateTest { - private StreamsMetadataState discovery; + private StreamsMetadataState metadataState; private HostInfo hostOne; private HostInfo hostTwo; private HostInfo hostThree; @@ -121,8 +121,8 @@ public Object apply(final Object value) { new PartitionInfo("topic-four", 0, null, null, null)); cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet()); - discovery = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne); - discovery.onChange(hostToPartitions, cluster); + metadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne); + metadataState.onChange(hostToPartitions, cluster); partitioner = new StreamPartitioner<String, Object>() { @Override public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { @@ -145,7 +145,7 @@ public void shouldGetAllStreamInstances() { final StreamsMetadata three = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"), Collections.singleton(topic3P0)); - final Collection<StreamsMetadata> actual = discovery.getAllMetadata(); + final Collection<StreamsMetadata> actual = metadataState.getAllMetadata(); assertEquals(3, actual.size()); assertTrue("expected " + actual + " to contain " + one, actual.contains(one)); assertTrue("expected " + actual + " to contain " + two, actual.contains(two)); @@ -165,11 +165,11 @@ public boolean test(final Object key, final Object value) { final HostInfo hostFour = new HostInfo("host-four", 8080); hostToPartitions.put(hostFour, Utils.mkSet(tp5)); - discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null)))); + metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null)))); final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.singleton(globalTable), Collections.singleton(tp5)); - final Collection<StreamsMetadata> actual = discovery.getAllMetadata(); + final Collection<StreamsMetadata> actual = metadataState.getAllMetadata(); assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected)); } @@ -179,7 +179,7 @@ public void shouldGetInstancesForStoreName() { Utils.mkSet(topic1P0, topic2P1, topic4P0)); final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"), Utils.mkSet(topic2P0, topic1P1)); - final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("table-one"); + final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("table-one"); assertEquals(2, actual.size()); assertTrue("expected " + actual + " to contain " + one, actual.contains(one)); assertTrue("expected " + actual + " to contain " + two, actual.contains(two)); @@ -187,12 +187,12 @@ public void shouldGetInstancesForStoreName() { @Test(expected = NullPointerException.class) public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() { - discovery.getAllMetadataForStore(null); + metadataState.getAllMetadataForStore(null); } @Test public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() { - final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("not-a-store"); + final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("not-a-store"); assertTrue(actual.isEmpty()); } @@ -201,12 +201,12 @@ public void shouldGetInstanceWithKey() { final TopicPartition tp4 = new TopicPartition("topic-three", 1); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); - discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); + metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"), Collections.singleton(topic3P0)); - final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", + final StreamsMetadata actual = metadataState.getMetadataWithKey("table-three", "the-key", Serdes.String().serializer()); @@ -218,19 +218,19 @@ public void shouldGetInstanceWithKeyAndCustomPartitioner() { final TopicPartition tp4 = new TopicPartition("topic-three", 1); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); - discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); + metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-three", "merged-table"), Utils.mkSet(topic2P0, tp4)); - final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", partitioner); + final StreamsMetadata actual = metadataState.getMetadataWithKey("table-three", "the-key", partitioner); assertEquals(expected, actual); } @Test public void shouldReturnNotAvailableWhenClusterIsEmpty() { - discovery.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), Cluster.empty()); - final StreamsMetadata result = discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); + metadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), Cluster.empty()); + final StreamsMetadata result = metadataState.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); assertEquals(StreamsMetadata.NOT_AVAILABLE, result); } @@ -238,12 +238,12 @@ public void shouldReturnNotAvailableWhenClusterIsEmpty() { public void shouldGetInstanceWithKeyWithMergedStreams() { final TopicPartition topic2P2 = new TopicPartition("topic-two", 2); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2)); - discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null)))); + metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null)))); final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("global-table", "table-two", "table-one", "merged-table"), Utils.mkSet(topic2P0, topic1P1, topic2P2)); - final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() { + final StreamsMetadata actual = metadataState.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() { @Override public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { return 2; @@ -256,7 +256,7 @@ public Integer partition(final String topic, final String key, final Object valu @Test public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() { - final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store", + final StreamsMetadata actual = metadataState.getMetadataWithKey("not-a-store", "key", Serdes.String().serializer()); assertNull(actual); @@ -264,28 +264,28 @@ public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() { @Test(expected = NullPointerException.class) public void shouldThrowWhenKeyIsNull() { - discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer()); + metadataState.getMetadataWithKey("table-three", null, Serdes.String().serializer()); } @Test(expected = NullPointerException.class) public void shouldThrowWhenSerializerIsNull() { - discovery.getMetadataWithKey("table-three", "key", (Serializer) null); + metadataState.getMetadataWithKey("table-three", "key", (Serializer) null); } @Test(expected = NullPointerException.class) public void shouldThrowIfStoreNameIsNull() { - discovery.getMetadataWithKey(null, "key", Serdes.String().serializer()); + metadataState.getMetadataWithKey(null, "key", Serdes.String().serializer()); } @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void shouldThrowIfStreamPartitionerIsNull() { - discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null); + metadataState.getMetadataWithKey(null, "key", (StreamPartitioner) null); } @Test public void shouldHaveGlobalStoreInAllMetadata() { - final Collection<StreamsMetadata> metadata = discovery.getAllMetadataForStore(globalTable); + final Collection<StreamsMetadata> metadata = metadataState.getAllMetadataForStore(globalTable); assertEquals(3, metadata.size()); for (final StreamsMetadata streamsMetadata : metadata) { assertTrue(streamsMetadata.stateStoreNames().contains(globalTable)); @@ -294,7 +294,7 @@ public void shouldHaveGlobalStoreInAllMetadata() { @Test public void shouldGetMyMetadataForGlobalStoreWithKey() { - final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()); + final StreamsMetadata metadata = metadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()); assertEquals(hostOne, metadata.hostInfo()); } @@ -307,7 +307,7 @@ public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() { @Test public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() { - final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", partitioner); + final StreamsMetadata metadata = metadataState.getMetadataWithKey(globalTable, "key", partitioner); assertEquals(hostOne, metadata.hostInfo()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 19bd523a8d5..1ac6d94cfee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -71,7 +71,7 @@ public void setUp() { Serdes.String(), Serdes.Long(), streamsMetrics, - new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), + new StreamsConfig(StreamsTestUtils.getStreamsConfig()), new RecordCollector.Supplier() { @Override public RecordCollector recordCollector() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 113e3e1151e..5ae32eb4eff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -63,7 +63,7 @@ @Before public void setUp() { - final Properties props = StreamsTestUtils.minimalStreamsConfig(); + final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); rocksDBStore = new RocksDBStore("test"); dir = TestUtils.tempDirectory(); @@ -131,7 +131,7 @@ public void shouldCallRocksDbConfigSetter() { @Test public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() { final File tmpDir = TestUtils.tempDirectory(); - final InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig())); + final InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig(StreamsTestUtils.getStreamsConfig())); assertTrue(tmpDir.setReadOnly()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 75bb21943ad..e0130098b38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -96,8 +96,7 @@ public void before() { configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog"); configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog"); - topology.setApplicationId(applicationId); - final ProcessorTopology processorTopology = topology.getInternalBuilder().build(); + final ProcessorTopology processorTopology = topology.getInternalBuilder(applicationId).build(); tasks = new HashMap<>(); stateDirectory = new StateDirectory(streamsConfig, new MockTime()); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index ec8d3280a4d..6d4f5e25419 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -68,7 +68,7 @@ public InternalMockProcessorContext() { null, null, new StreamsMetricsImpl(new Metrics(), "mock"), - new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), + new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null ); @@ -99,12 +99,8 @@ public InternalMockProcessorContext(final StateSerdes<?, ?> serdes, serdes.keySerde(), serdes.valueSerde(), new StreamsMetricsImpl(metrics, "mock"), - new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() { - @Override - public RecordCollector recordCollector() { - return collector; - } - }, + new StreamsConfig(StreamsTestUtils.getStreamsConfig()), + () -> collector, null ); } @@ -118,7 +114,7 @@ public InternalMockProcessorContext(final File stateDir, keySerde, valSerde, new StreamsMetricsImpl(new Metrics(), "mock"), - new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), + new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, cache ); diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 940651915f2..1d64316cb3c 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -16,10 +16,10 @@ */ package org.apache.kafka.test; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -44,48 +44,37 @@ public static Properties getStreamsConfig(final String applicationId, final String valueSerdeClassName, final Properties additional) { - final Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.putAll(additional); - return streamsConfiguration; - - } - - public static Properties topologyTestConfig(final String applicationId, - final String bootstrapServers, - final String keyDeserializer, - final String valueDeserializer) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); - props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keyDeserializer); - props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueDeserializer); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.putAll(additional); return props; } - public static Properties topologyTestConfig(final Serde keyDeserializer, - final Serde valueDeserializer) { - return topologyTestConfig( + public static Properties getStreamsConfig(final Serde keyDeserializer, + final Serde valueDeserializer) { + return getStreamsConfig( UUID.randomUUID().toString(), "localhost:9091", keyDeserializer.getClass().getName(), - valueDeserializer.getClass().getName()); + valueDeserializer.getClass().getName(), + new Properties()); + } + + public static Properties getStreamsConfig(final String applicationId) { + return getStreamsConfig( + applicationId, + "localhost:9091", + Serdes.ByteArraySerde.class.getName(), + Serdes.ByteArraySerde.class.getName(), + new Properties()); } - public static Properties minimalStreamsConfig() { - final Properties properties = new Properties(); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "anyserver:9092"); - return properties; + public static Properties getStreamsConfig() { + return getStreamsConfig(UUID.randomUUID().toString()); } public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index d2796db6f9c..5f3f96145a3 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -239,7 +239,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, mockWallClockTime = new MockTime(initialWallClockTimeMs); internalTopologyBuilder = builder; - internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)); + internalTopologyBuilder.rewriteTopology(streamsConfig); processorTopology = internalTopologyBuilder.build(null); globalTopology = internalTopologyBuilder.buildGlobalStateTopology(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove caching wrapper stores if cache-size is configured to zero bytes > ----------------------------------------------------------------------- > > Key: KAFKA-6998 > URL: https://issues.apache.org/jira/browse/KAFKA-6998 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Major > Labels: kip > > Users can disable caching globally by setting the cache size to zero in their > config. However, this does only effectively disable the caching layer, but > the code is still in place. > We should consider to remove the caching wrappers completely for this case. > The tricky part is, that we insert the caching layer at compile time, ie, > when calling `StreamsBuilder#build()` – at this point, we don't know the > configuration yet. Thus, we need to find a way to rewrite the topology after > it is passed to `KafkaStreams` if case caching size is set to zero. > KIP: [KIP-356: Add withCachingDisabled() to > StoreBuilder|https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+withCachingDisabled%28%29+to+StoreBuilder?src=jira] -- This message was sent by Atlassian JIRA (v7.6.3#76005)