This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch kip-478-part-4-record-processor in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d3321aa0d199c80b04ab9e1121b4501b333db7a5 Author: John Roesler <[email protected]> AuthorDate: Thu Oct 1 12:06:18 2020 -0500 move metadata to context --- .../examples/docs/DeveloperGuideTesting.java | 10 ++++----- .../kafka/streams/processor/api/Processor.java | 9 +------- .../streams/processor/api/ProcessorContext.java | 10 +++++++++ .../internals/AbstractProcessorContext.java | 7 ++++++ .../internals/GlobalProcessorContextImpl.java | 5 +---- .../processor/internals/GlobalStateUpdateTask.java | 6 +---- .../processor/internals/ProcessorAdapter.java | 5 +---- .../processor/internals/ProcessorContextImpl.java | 6 +---- .../streams/processor/internals/ProcessorNode.java | 8 +++---- .../streams/processor/internals/SinkNode.java | 26 ++++++---------------- .../streams/processor/internals/SourceNode.java | 5 +---- .../streams/processor/internals/StreamTask.java | 5 +---- .../org/apache/kafka/streams/KafkaStreamsTest.java | 6 ++--- .../apache/kafka/streams/StreamsBuilderTest.java | 4 +--- .../org/apache/kafka/streams/TopologyTest.java | 4 +--- .../internals/GlobalProcessorContextImplTest.java | 2 +- .../processor/internals/ProcessorNodeTest.java | 6 ++--- .../processor/internals/ProcessorTopologyTest.java | 14 +++++------- .../streams/processor/internals/SinkNodeTest.java | 2 +- .../processor/internals/StreamTaskTest.java | 6 ++--- .../processor/internals/StreamThreadTest.java | 4 +--- .../kafka/test/InternalMockProcessorContext.java | 5 ++--- .../org/apache/kafka/test/MockApiProcessor.java | 4 +--- .../kafka/test/MockInternalProcessorContext.java | 7 ++++++ .../java/org/apache/kafka/test/MockProcessor.java | 7 +----- .../org/apache/kafka/test/MockProcessorNode.java | 6 ++--- .../java/org/apache/kafka/test/MockSourceNode.java | 4 +--- .../kafka/streams/TopologyTestDriverTest.java | 19 ++++++++-------- 28 files changed, 78 insertions(+), 124 deletions(-) diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java index 1ce3445..72e704e 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java @@ -21,26 +21,24 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.time.Duration; import java.time.Instant; -import java.util.Optional; import java.util.Properties; import static org.hamcrest.CoreMatchers.equalTo; @@ -170,7 +168,7 @@ public class DeveloperGuideTesting { } @Override - public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, Long> record) { final Long oldValue = store.get(record.key()); if (oldValue == null || record.value() > oldValue) { store.put(record.key(), record.value()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java index e6feccb..167976b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java @@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import java.time.Duration; -import java.util.Optional; /** * A processor of key-value pair records. @@ -50,14 +49,8 @@ public interface Processor<KIn, VIn, KOut, VOut> { * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator. * * @param record the record to process - * @param recordMetadata the metadata of the record, if it is defined. Note that as long as the processor is - * receiving a record downstream of a Source (i.e., the current record is coming from an - * input topic), the metadata is defined. On the other hand, if a parent processor has - * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that - * punctuator, then there is no record from an input topic, and therefore the metadata - * would be undefined. */ - void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata); + void process(Record<KIn, VIn> record); /** * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java index f4f0fdb..a431993 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java @@ -53,6 +53,16 @@ public interface ProcessorContext<KForward, VForward> { TaskId taskId(); /** + * The metadata of the record, if it is defined. Note that as long as the processor is + * receiving a record downstream of a Source (i.e., the current record is coming from an + * input topic), the metadata is defined. On the other hand, if a parent processor has + * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that + * punctuator, then there is no record from an input topic, and therefore the metadata + * would be undefined. + */ + Optional<RecordMetadata> recordMetadata(); + + /** * Returns the default key serde. * * @return the key serializer diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index ef222e0..c29614a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -31,6 +32,7 @@ import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; public abstract class AbstractProcessorContext implements InternalProcessorContext { @@ -201,6 +203,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte } @Override + public Optional<RecordMetadata> recordMetadata() { + return Optional.ofNullable(recordContext); + } + + @Override public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) { this.currentNode = currentNode; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 9f31309..3468833 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -25,13 +25,11 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; import java.time.Duration; -import java.util.Optional; import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; @@ -63,11 +61,10 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { @Override public <K, V> void forward(final Record<K, V> record) { final ProcessorNode<?, ?, ?, ?> previousNode = currentNode(); - final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext); try { for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) { setCurrentNode(child); - ((ProcessorNode<K, V, ?, ?>) child).process(record, recordMetadata); + ((ProcessorNode<K, V, ?, ?>) child).process(record); } } finally { setCurrentNode(previousNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 360e50e..6b1378b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; @@ -112,10 +111,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { processorContext.timestamp(), processorContext.headers() ); - ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process( - toProcess, - Optional.of(recordContext) - ); + ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess); } offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index 291a99e..84c8602 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -20,9 +20,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; - -import java.util.Optional; public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate; @@ -57,7 +54,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K } @Override - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<KIn, VIn> record) { final ProcessorRecordContext processorRecordContext = context.recordContext(); try { context.setRecordContext(new ProcessorRecordContext( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 309b813..d164428 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -38,7 +37,6 @@ import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore; @@ -263,9 +261,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final Record<K, V> record) { setCurrentNode(child); - final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(recordContext); - - child.process(record, recordMetadata); + child.process(record); if (child.isTerminalNode()) { streamTask.maybeRecordE2ELatency(record.timestamp(), currentSystemTimeMs(), child.name()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 2939525..38daa52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -20,11 +20,10 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -32,7 +31,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; @@ -176,11 +174,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> { } - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<KIn, VIn> record) { throwIfClosed(); try { - maybeMeasureLatency(() -> processor.process(record, recordMetadata), time, processSensor); + maybeMeasureLatency(() -> processor.process(record), time, processSensor); } catch (final ClassCastException e) { final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName(); final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index f8840e4..813bcb1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -22,9 +22,6 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; - -import java.util.Optional; public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { @@ -81,7 +78,7 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut } @Override - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<KIn, VIn> record) { final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); final KIn key = record.key(); @@ -96,22 +93,13 @@ public class SinkNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut ); } - // Prefer the record metadata if defined, - // and fall back to the context (which is undefined and dummy values, - // but extractors may still depend on the current behavior. - final Optional<ProcessorRecordContext> maybeContext = - recordMetadata.map( - m -> new ProcessorRecordContext(timestamp, m.offset(), m.partition(), m.topic(), record.headers()) - ); final ProcessorRecordContext contextForExtraction = - maybeContext.orElseGet( - () -> new ProcessorRecordContext( - timestamp, - context.offset(), - context.partition(), - context.topic(), - record.headers() - ) + new ProcessorRecordContext( + timestamp, + context.offset(), + context.partition(), + context.topic(), + record.headers() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 711b4c3..7fa8c64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -22,11 +22,8 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; -import java.util.Optional; - public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { private InternalProcessorContext context; @@ -96,7 +93,7 @@ public class SourceNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KO @Override - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<KIn, VIn> record) { context.forward(record); processAtSourceSensor.record(1.0d, context.currentSystemTimeMs()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c4e4ff3..464ac9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; @@ -56,7 +55,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -692,8 +690,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, processorContext.timestamp(), processorContext.headers() ); - final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext()); - maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor); + maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor); log.trace("Completed processing one record [{}]", record); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 7ff2c6c..38baeb6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -41,7 +41,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -78,7 +77,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Executors; @@ -869,7 +867,7 @@ public class KafkaStreamsTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { if (record.value().length() % 2 == 0) { context.forward(record.withValue(record.key() + record.value())); } @@ -970,7 +968,7 @@ public class KafkaStreamsTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName); kvStore.put(record.key(), 5L); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b308b4f..415aaea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -60,7 +59,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import static java.util.Arrays.asList; @@ -109,7 +107,7 @@ public class StreamsBuilderTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { store.put(record.key(), record.value()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 9e9f415..ef9becf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -46,7 +45,6 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -379,7 +377,7 @@ public class TopologyTest { } @Override - public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { } + public void process(final Record<Object, Object> record) { } }; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index 6322fd2..a83c92b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -100,7 +100,7 @@ public class GlobalProcessorContextImplTest { @Test public void shouldForwardToSingleChild() { - child.process(anyObject(), anyObject()); + child.process(anyObject()); expectLastCall(); expect(recordContext.timestamp()).andStubReturn(0L); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index a4efcbc..8ff9451 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.Optional; -import java.util.Properties; - import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.StringSerializer; @@ -40,6 +37,7 @@ import org.junit.Test; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Properties; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.hamcrest.CoreMatchers.containsString; @@ -208,7 +206,7 @@ public class ProcessorNodeTest { node.init(context); final StreamsException se = assertThrows( StreamsException.class, - () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders()), Optional.ofNullable(context.recordContext())) + () -> node.process(new Record<>("aKey", "aValue", 0, new RecordHeaders())) ); assertThat(se.getCause(), instanceOf(ClassCastException.class)); assertThat(se.getMessage(), containsString("default Serdes")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 0b7c1b3..07ad044 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -55,7 +54,6 @@ import java.io.File; import java.time.Duration; import java.time.Instant; import java.util.Collections; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Supplier; @@ -777,7 +775,7 @@ public class ProcessorTopologyTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { context.forward(record); } } @@ -794,7 +792,7 @@ public class ProcessorTopologyTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { context.forward(record.withTimestamp(record.timestamp() + 10)); } } @@ -816,7 +814,7 @@ public class ProcessorTopologyTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { context.forward(record); context.forward(record.withTimestamp(record.timestamp() + 5), firstChild); context.forward(record, secondChild); @@ -833,7 +831,7 @@ public class ProcessorTopologyTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { context.forward(record.withHeaders(record.headers().add(HEADER))); } } @@ -851,7 +849,7 @@ public class ProcessorTopologyTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { context.forward(record.withValue(record.value().split("@")[0])); } } @@ -936,7 +934,7 @@ public class ProcessorTopologyTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { store.put(record.key(), record.value()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index bc6f08b..c877ab5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -52,7 +52,7 @@ public class SinkNodeTest { // When/Then context.setTime(-1); // ensures a negative timestamp is set for the record we send next try { - illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders()), java.util.Optional.empty()); + illTypedSink.process(new Record<>("any key".getBytes(), "any value".getBytes(), -1, new RecordHeaders())); fail("Should have thrown StreamsException"); } catch (final StreamsException ignored) { // expected diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index b54aa6c..e620be7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -51,7 +51,6 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -80,7 +79,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -135,7 +133,7 @@ public class StreamTaskTest { private final MockSourceNode<Integer, Integer, Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode<Integer, Integer, ?, ?> source3 = new MockSourceNode<Integer, Integer, Object, Object>(intDeserializer, intDeserializer) { @Override - public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<Integer, Integer> record) { throw new RuntimeException("KABOOM!"); } @@ -471,7 +469,7 @@ public class StreamTaskTest { } @Override - public void process(final Record<Integer, Integer> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<Integer, Integer> record) { if (record.key() % 2 == 0) { context.forward(record); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 6520778..4ee44ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -64,7 +64,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; @@ -94,7 +93,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -1205,7 +1203,7 @@ public class StreamThreadTest { "proc", () -> new Processor<Object, Object, Object, Object>() { @Override - public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<Object, Object> record) { if (shouldThrow.get()) { throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition)))); } else { diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index bd82e29..6086f97 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -55,7 +55,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt; @@ -300,7 +299,7 @@ public class InternalMockProcessorContext try { for (final ProcessorNode<?, ?, ?, ?> childNode : thisNode.children()) { currentNode = childNode; - ((ProcessorNode<K, V, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext)); + ((ProcessorNode<K, V, ?, ?>) childNode).process(record); } } finally { currentNode = thisNode; @@ -337,7 +336,7 @@ public class InternalMockProcessorContext if (toInternal.child() == null || toInternal.child().equals(childNode.name())) { currentNode = childNode; final Record<Object, Object> record = new Record<>(key, value, toInternal.timestamp(), headers()); - ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record, Optional.ofNullable(recordContext)); + ((ProcessorNode<Object, Object, ?, ?>) childNode).process(record); toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple // Processors and toInternal might have been modified } diff --git a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java index 8bed338..dd56bad 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.state.ValueAndTimestamp; import java.time.Duration; @@ -30,7 +29,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -74,7 +72,7 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VI } @Override - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<KIn, VIn> record) { final KIn key = record.key(); final VIn value = record.value(); final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, record.timestamp()); diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 82b24d1..370dca7 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; @@ -84,6 +86,11 @@ public class MockInternalProcessorContext extends MockProcessorContext implement } @Override + public Optional<RecordMetadata> recordMetadata() { + return Optional.of(recordContext()); + } + + @Override public void setRecordContext(final ProcessorRecordContext recordContext) { setRecordMetadata( recordContext.topic(), diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java index 6c653c3..f18b763 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java @@ -22,17 +22,14 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.ValueAndTimestamp; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; public class MockProcessor<K, V> extends AbstractProcessor<K, V> { private final MockApiProcessor<K, V, Object, Object> delegate; - private InternalProcessorContext internalProcessorContext; public MockProcessor(final PunctuationType punctuationType, final long scheduleInterval) { @@ -47,14 +44,12 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> { @Override public void init(final ProcessorContext context) { super.init(context); - internalProcessorContext = (InternalProcessorContext) context; delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context); } @Override public void process(final K key, final V value) { - final Record<K, V> record = new Record<>(key, value, context.timestamp(), context.headers()); - delegate.process(record, Optional.ofNullable(internalProcessorContext.recordContext())); + delegate.process(new Record<>(key, value, context.timestamp(), context.headers())); } public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) { diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java index 90fd905..a75c250 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java @@ -18,12 +18,10 @@ package org.apache.kafka.test; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import java.util.Collections; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, VIn, KOut, VOut> { @@ -61,8 +59,8 @@ public class MockProcessorNode<KIn, VIn, KOut, VOut> extends ProcessorNode<KIn, } @Override - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { - processor().process(record, recordMetadata); + public void process(final Record<KIn, VIn> record) { + processor().process(record); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java index 4c3fed1..9d22e3b 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java @@ -18,12 +18,10 @@ package org.apache.kafka.test; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.SourceNode; import java.util.ArrayList; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, KOut, VOut> { @@ -42,7 +40,7 @@ public class MockSourceNode<KIn, VIn, KOut, VOut> extends SourceNode<KIn, VIn, K } @Override - public void process(final Record<KIn, VIn> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<KIn, VIn> record) { numReceived++; keys.add(record.key()); values.add(record.value()); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 48783a6..fd9fb76 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -72,7 +72,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -266,14 +265,14 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<Object, Object> record) { processedRecords.add(new TTDTestRecord( record.key(), record.value(), record.headers(), record.timestamp(), - recordMetadata.map(RecordMetadata::offset).orElse(-1L), - recordMetadata.map(RecordMetadata::topic).orElse(null) + context.recordMetadata().map(RecordMetadata::offset).orElse(-1L), + context.recordMetadata().map(RecordMetadata::topic).orElse(null) )); context.forward(record); } @@ -408,7 +407,7 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<Object, Object> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<Object, Object> record) { store.put(record.key(), record.value()); } } @@ -1461,7 +1460,7 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, Long> record) { final Long oldValue = store.get(record.key()); if (oldValue == null || record.value() > oldValue) { store.put(record.key(), record.value()); @@ -1514,7 +1513,7 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<String, Long> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, Long> record) { store.put(record.key(), record.value()); } }; @@ -1703,7 +1702,7 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { final String value = record.value(); if (!value.startsWith("recurse-")) { context.forward(record.withValue("recurse-" + value), "recursiveSink"); @@ -1761,7 +1760,7 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { stateStore.put(record.key(), record.value()); } } @@ -1777,7 +1776,7 @@ public class TopologyTestDriverTest { } @Override - public void process(final Record<String, String> record, final Optional<RecordMetadata> recordMetadata) { + public void process(final Record<String, String> record) { final String value = record.value(); if (!value.startsWith("recurse-")) { context.forward(record.withValue("recurse-" + value), "recursiveSink");
