This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 18e60cb0001 KAFKA-12497: Skip periodic offset commits for failed source tasks (#10528) 18e60cb0001 is described below commit 18e60cb0001f72018d9d00221253f69b8761a2e7 Author: Chris Egerton <chr...@aiven.io> AuthorDate: Thu Oct 13 10:15:42 2022 -0400 KAFKA-12497: Skip periodic offset commits for failed source tasks (#10528) Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect. Reviewers: Nigel Liang <ni...@nigelliang.com>, Kalpesh Patel <kpa...@confluent.io>, John Roesler <vvcep...@apache.org>, Tom Bentley <tbent...@redhat.com> --- build.gradle | 1 + checkstyle/import-control.xml | 5 +-- .../kafka/common/utils}/LogCaptureAppender.java | 2 +- .../connect/runtime/SourceTaskOffsetCommitter.java | 8 ++++- .../kafka/connect/runtime/WorkerSourceTask.java | 8 +++++ .../apache/kafka/connect/runtime/WorkerTask.java | 7 ++++ .../connect/runtime/WorkerSourceTaskTest.java | 40 +++++++++++++++++----- .../kafka/connect/runtime/rest/RestServerTest.java | 6 ++-- .../unit/kafka/utils/LogCaptureAppender.scala | 6 ---- .../org/apache/kafka/streams/KafkaStreamsTest.java | 2 +- .../apache/kafka/streams/StreamsConfigTest.java | 2 +- .../integration/AdjustStreamThreadCountTest.java | 2 +- .../kstream/internals/KGroupedStreamImplTest.java | 2 +- .../kstream/internals/KStreamKStreamJoinTest.java | 2 +- .../kstream/internals/KStreamKTableJoinTest.java | 2 +- ...KStreamSessionWindowAggregateProcessorTest.java | 4 +-- .../KStreamSlidingWindowAggregateTest.java | 4 +-- .../internals/KStreamWindowAggregateTest.java | 2 +- .../internals/KTableKTableInnerJoinTest.java | 2 +- .../internals/KTableKTableLeftJoinTest.java | 2 +- .../internals/KTableKTableOuterJoinTest.java | 2 +- .../internals/KTableKTableRightJoinTest.java | 4 +-- .../kstream/internals/KTableSourceTest.java | 4 +-- .../internals/GlobalStateManagerImplTest.java | 2 +- .../internals/InternalTopicManagerTest.java | 2 +- .../processor/internals/PartitionGroupTest.java | 2 +- .../internals/ProcessorStateManagerTest.java | 2 +- .../processor/internals/RecordCollectorTest.java | 2 +- .../processor/internals/StateDirectoryTest.java | 2 +- .../internals/StoreChangelogReaderTest.java | 2 +- .../processor/internals/StreamThreadTest.java | 2 +- .../processor/internals/TaskManagerTest.java | 2 +- ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 2 +- .../state/internals/AbstractKeyValueStoreTest.java | 2 +- .../AbstractRocksDBSegmentedBytesStoreTest.java | 2 +- .../internals/AbstractSessionBytesStoreTest.java | 2 +- .../internals/AbstractWindowBytesStoreTest.java | 2 +- .../internals/CachingInMemorySessionStoreTest.java | 2 +- .../CachingPersistentSessionStoreTest.java | 2 +- .../CachingPersistentWindowStoreTest.java | 2 +- ...sToDbOptionsColumnFamilyOptionsAdapterTest.java | 2 +- .../internals/RocksDBTimestampedStoreTest.java | 2 +- ...imeOrderedCachingPersistentWindowStoreTest.java | 2 +- .../internals/TimeOrderedWindowStoreTest.java | 2 +- 44 files changed, 98 insertions(+), 63 deletions(-) diff --git a/build.gradle b/build.gradle index 7cf6cc44b1c..daa03038150 100644 --- a/build.gradle +++ b/build.gradle @@ -1274,6 +1274,7 @@ project(':clients') { testImplementation libs.bcpkix testImplementation libs.junitJupiter + testImplementation libs.log4j testImplementation libs.mockitoInline testRuntimeOnly libs.slf4jlog4j diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b2959d9ae6b..ed0efe10aa1 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -198,6 +198,7 @@ <subpackage name="utils"> <allow pkg="org.apache.kafka.common" /> + <allow pkg="org.apache.log4j" /> </subpackage> <subpackage name="quotas"> @@ -454,9 +455,6 @@ <allow pkg="com.fasterxml.jackson" /> <allow pkg="kafka.utils" /> <allow pkg="org.apache.zookeeper" /> - <subpackage name="testutil"> - <allow pkg="org.apache.log4j" /> - </subpackage> </subpackage> </subpackage> </subpackage> @@ -573,7 +571,6 @@ <allow pkg="com.fasterxml.jackson" /> <allow pkg="org.apache.http"/> <allow pkg="io.swagger.v3.oas.annotations"/> - <allow pkg="kafka.utils" /> <subpackage name="resources"> <allow pkg="org.apache.log4j" /> </subpackage> diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java rename to clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java index 41d15da8c8a..0d569af30a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.processor.internals.testutil; +package org.apache.kafka.common.utils; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java index c3416be45c7..0aa170dcef1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java @@ -104,7 +104,13 @@ class SourceTaskOffsetCommitter { } } - private void commit(WorkerSourceTask workerTask) { + // Visible for testing + static void commit(WorkerSourceTask workerTask) { + if (!workerTask.shouldCommitOffsets()) { + log.trace("{} Skipping offset commit as there are no offsets that should be committed", workerTask); + return; + } + log.debug("{} Committing offsets", workerTask); try { if (workerTask.commitOffsets()) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 8a8de1b0553..4f7fd741e61 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -194,6 +194,14 @@ class WorkerSourceTask extends AbstractWorkerSourceTask { commitOffsets(); } + /** + * @return whether an attempt to commit offsets should be made for the task (i.e., there are pending uncommitted + * offsets and the task's producer has not already failed to send a record with a non-retriable error). + */ + public boolean shouldCommitOffsets() { + return !isFailed(); + } + public boolean commitOffsets() { long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 0d1749901e3..5c93d52a39a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -63,6 +63,7 @@ abstract class WorkerTask implements Runnable { private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final TaskMetricsGroup taskMetricsGroup; private volatile TargetState targetState; + private volatile boolean failed; private volatile boolean stopping; // indicates whether the Worker has asked the task to stop private volatile boolean cancelled; // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown) private final ErrorHandlingMetrics errorMetrics; @@ -84,6 +85,7 @@ abstract class WorkerTask implements Runnable { this.statusListener = taskMetricsGroup; this.loader = loader; this.targetState = initialState; + this.failed = false; this.stopping = false; this.cancelled = false; this.taskMetricsGroup.recordState(this.targetState); @@ -163,6 +165,10 @@ abstract class WorkerTask implements Runnable { protected abstract void close(); + protected boolean isFailed() { + return failed; + } + protected boolean isStopping() { return stopping; } @@ -196,6 +202,7 @@ abstract class WorkerTask implements Runnable { statusListener.onStartup(id); execute(); } catch (Throwable t) { + failed = true; if (cancelled) { log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", this, t); } else if (stopping) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index f411efbde3b..f2530681cf9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ParameterizedTest; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -99,6 +100,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -372,7 +374,7 @@ public class WorkerSourceTaskTest { sourceTask.stop(); EasyMock.expectLastCall(); - expectOffsetFlush(true); + expectEmptyOffsetFlush(); expectClose(); @@ -382,8 +384,9 @@ public class WorkerSourceTaskTest { Future<?> taskFuture = executor.submit(workerTask); assertTrue(awaitLatch(pollLatch)); - //Failure in poll should trigger automatic stop of the worker + //Failure in poll should trigger automatic stop of the task assertTrue(workerTask.awaitStop(1000)); + assertShouldSkipCommit(); taskFuture.get(); assertPollMetrics(0); @@ -467,6 +470,7 @@ public class WorkerSourceTaskTest { workerTask.stop(); workerStopLatch.countDown(); assertTrue(workerTask.awaitStop(1000)); + assertShouldSkipCommit(); taskFuture.get(); assertPollMetrics(0); @@ -483,11 +487,11 @@ public class WorkerSourceTaskTest { // We'll wait for some data, then trigger a flush final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger()); - expectOffsetFlush(true); + expectEmptyOffsetFlush(); sourceTask.stop(); EasyMock.expectLastCall(); - expectOffsetFlush(true); + expectEmptyOffsetFlush(); statusListener.onShutdown(taskId); EasyMock.expectLastCall(); @@ -528,7 +532,7 @@ public class WorkerSourceTaskTest { sourceTask.stop(); EasyMock.expectLastCall(); - expectOffsetFlush(true); + expectEmptyOffsetFlush(); statusListener.onShutdown(taskId); EasyMock.expectLastCall(); @@ -647,10 +651,6 @@ public class WorkerSourceTaskTest { @Test public void testSendRecordsProducerSendFailsImmediately() { - if (!enableTopicCreation) - // should only test with topic creation enabled - return; - createWorkerTask(); SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); @@ -1023,6 +1023,12 @@ public class WorkerSourceTaskTest { } } + private void expectEmptyOffsetFlush() throws Exception { + EasyMock.expect(offsetWriter.beginFlush()).andReturn(false); + sourceTask.commit(); + EasyMock.expectLastCall(); + } + private void assertPollMetrics(int minimumPollCountExpected) { MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup(); MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); @@ -1109,4 +1115,20 @@ public class WorkerSourceTaskTest { EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic)); } } + + private void assertShouldSkipCommit() { + assertFalse(workerTask.shouldCommitOffsets()); + + LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class); + LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class); + try (LogCaptureAppender committerAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class); + LogCaptureAppender taskAppender = LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) { + SourceTaskOffsetCommitter.commit(workerTask); + assertEquals(Collections.emptyList(), taskAppender.getMessages()); + + List<String> committerMessages = committerAppender.getMessages(); + assertEquals(1, committerMessages.size()); + assertTrue(committerMessages.get(0).contains("Skipping offset commit")); + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 82f9e0395e3..8abedd740e9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.rest; -import kafka.utils.LogCaptureAppender; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpHost; @@ -31,6 +30,7 @@ import org.apache.http.impl.client.BasicResponseHandler; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -362,10 +362,10 @@ public class RestServerTest { // Stop the server to flush all logs server.stop(); - Collection<String> logMessages = restServerAppender.getRenderedMessages(); + Collection<String> logMessages = restServerAppender.getMessages(); LogCaptureAppender.unregister(restServerAppender); restServerAppender.close(); - String expectedlogContent = "\"GET / HTTP/1.1\" " + String.valueOf(response.getStatusLine().getStatusCode()); + String expectedlogContent = "\"GET / HTTP/1.1\" " + response.getStatusLine().getStatusCode(); assertTrue(logMessages.stream().anyMatch(logMessage -> logMessage.contains(expectedlogContent))); } diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala index bec77356e1e..2d071452829 100644 --- a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala +++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala @@ -20,7 +20,6 @@ package kafka.utils import org.apache.log4j.{AppenderSkeleton, Level, Logger} import org.apache.log4j.spi.LoggingEvent -import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer class LogCaptureAppender extends AppenderSkeleton { @@ -38,11 +37,6 @@ class LogCaptureAppender extends AppenderSkeleton { } } - def getRenderedMessages: java.util.List[String] = { - return getMessages.map(e => e.getRenderedMessage).asJava - } - - override def close(): Unit = { events.synchronized { events.clear() diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index b788e54923a..766cd04498e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -58,7 +58,7 @@ import org.apache.kafka.streams.processor.internals.StreamsMetadataState; import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl; import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index c1b9e6a140a..496327635ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -33,7 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.junit.Before; import org.junit.Rule; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index 59683e44600..b203b46b376 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -31,7 +32,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 354fbcac318..8076475eaed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.ValueAndTimestamp; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index d4f716df084..6864396d433 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -33,7 +33,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index d162acfa374..dbbe56d4313 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; @@ -43,7 +44,6 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index fc993b63a9e..f72f0891185 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -37,8 +37,8 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender.Event; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java index 796afd38845..50b0f8dcdde 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java @@ -39,8 +39,8 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender.Event; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.ValueAndTimestamp; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 124bb5593b9..23b250e503c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; @@ -47,7 +48,6 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForwa import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 1039cb70ddf..0834727234e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockApiProcessor; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 4af2e3934e1..ee73849e9fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -34,7 +34,7 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockApiProcessor; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index b14973cc934..12e503b3a48 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java index eaf74c8660a..9fcffac3755 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender.Event; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 70e1bccdbe6..2fc5834b36b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -33,8 +33,8 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender.Event; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockApiProcessor; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 4ba88b73e66..0644f1b705a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.WrappedStateStore; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.NoOpReadOnlyStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 74cd6cfaa23..9e4964b5647 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -52,7 +52,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 632b0efc23c..599068c0f3c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 9892a13f087..acb7f0da073 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.TimestampedBytesStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index b3fa516a3f7..c1e8ff8628a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -52,7 +53,6 @@ import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockClientSupplier; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 205f19537ba..c387823fe03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.TestUtils; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index fbc8d423261..1195550fd28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -39,7 +39,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.StreamsTestUtils; import org.easymock.EasyMock; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 075ecb3ef34..37f32ed4cd6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -73,7 +73,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.StreamThread.State; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 69688df6f63..b1fa45c58d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -49,7 +49,7 @@ import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTasks; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import java.nio.file.Files; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 8a44b9ab501..c81e57589a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -45,7 +45,7 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 19b057a8c98..9e0fb306f28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStoreContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index b82f544026e..6e1b3bfcf8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime; @@ -44,7 +45,6 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 78d7f08ee84..efbcd8855a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -34,7 +35,6 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index c69dedc91db..d29c6bf88d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -34,7 +34,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index d5aa667c0c5..7e0787e6fee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; @@ -35,7 +36,6 @@ import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 50fd88a2769..54abbd4cc52 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; @@ -34,7 +35,6 @@ import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 72779314e85..27fddc9b075 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -38,7 +39,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StoreBuilder; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 4cafecfa2c3..8eec0a40056 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.easymock.EasyMockRunner; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index a1d511ae1ca..58e2f9c3a47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStoreContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.hamcrest.core.IsNull; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index 1ee797d35ec..90b17ded1d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -40,7 +40,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StoreBuilder; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index bf597fb789b..bcc1005eb41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -40,7 +40,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StoreBuilder;