This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 4d65ecb KAFKA-12435: Fix javadoc errors (#10392)
4d65ecb is described below
commit 4d65ecb70956e34f238ed3dd6b5baa75c64537d3
Author: John Roesler <[email protected]>
AuthorDate: Wed Mar 24 13:55:27 2021 -0500
KAFKA-12435: Fix javadoc errors (#10392)
There were errors while generating javadoc for the streams:test-utils module
because the included TopologyTestDriver imported some excluded classes.
This fixes the errors by inlining the previously excluded packages.
Reviewers: Chia-Ping Tsai <[email protected]>, Ismael Juma
<[email protected]>
---
build.gradle | 1 -
.../processor/internals/StreamsProducer.java | 2 +-
.../apache/kafka/streams/TopologyTestDriver.java | 174 ++++++++++++++++++++-
.../streams/internals/KeyValueStoreFacade.java | 99 ------------
.../kafka/streams/internals/WindowStoreFacade.java | 109 -------------
.../processor/internals/TestDriverProducer.java | 44 ------
.../{internals => }/KeyValueStoreFacadeTest.java | 4 +-
.../{internals => }/WindowStoreFacadeTest.java | 3 +-
8 files changed, 176 insertions(+), 260 deletions(-)
diff --git a/build.gradle b/build.gradle
index d62b56f..1648605 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1602,7 +1602,6 @@ project(':streams:test-utils') {
javadoc {
include "**/org/apache/kafka/streams/test/**"
- exclude "**/org/apache/kafka/streams/internals/**",
"**/org/apache/kafka/streams/**/internals/**"
}
tasks.create(name: "copyDependantLibs", type: Copy) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index f6d18a3..1102aab 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -237,7 +237,7 @@ public class StreamsProducer {
* @throws IllegalStateException if EOS is disabled
* @throws TaskMigratedException
*/
- void commitTransaction(final Map<TopicPartition, OffsetAndMetadata>
offsets,
+ protected void commitTransaction(final Map<TopicPartition,
OffsetAndMetadata> offsets,
final ConsumerGroupMetadata consumerGroupMetadata) {
if (!eosEnabled()) {
throw new IllegalStateException(formatException("Exactly-once is
not enabled"));
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7a97a19..35bae58 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -41,8 +42,13 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.internals.KeyValueStoreFacade;
-import org.apache.kafka.streams.internals.WindowStoreFacade;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.StreamsProducer;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
+import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -66,7 +72,6 @@ import
org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
-import org.apache.kafka.streams.processor.internals.TestDriverProducer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -99,6 +104,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -108,6 +114,7 @@ import java.util.regex.Pattern;
import static
org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.AT_LEAST_ONCE;
import static
org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static
org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
/**
* This class makes it easier to write tests to verify the behavior of
topologies created with {@link Topology} or
@@ -1264,4 +1271,165 @@ public class TopologyTestDriver implements Closeable {
throw new UnsupportedOperationException();
}
}
+
+ static class KeyValueStoreFacade<K, V> extends
ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
+
+ public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner)
{
+ super(inner);
+ }
+
+ @Deprecated
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore
root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void put(final K key,
+ final V value) {
+ inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP));
+ }
+
+ @Override
+ public V putIfAbsent(final K key,
+ final V value) {
+ return getValueOrNull(inner.putIfAbsent(key,
ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP)));
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ for (final KeyValue<K, V> entry : entries) {
+ inner.put(entry.key, ValueAndTimestamp.make(entry.value,
ConsumerRecord.NO_TIMESTAMP));
+ }
+ }
+
+ @Override
+ public V delete(final K key) {
+ return getValueOrNull(inner.delete(key));
+ }
+
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+ }
+
+ static class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K,
V> implements WindowStore<K, V> {
+
+ public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
+ super(store);
+ }
+
+ @Deprecated
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore
root) {
+ inner.init(context, root);
+ }
+
+ @Deprecated
+ @Override
+ public void put(final K key,
+ final V value) {
+ inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP));
+ }
+
+ @Override
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
+ inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
+ }
+
+ @Override
+ public WindowStoreIterator<V> backwardFetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return backwardFetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+ final K keyTo,
+ final long
timeFrom,
+ final long
timeTo) {
+ return backwardFetch(keyFrom, keyTo,
Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom, final long timeTo) {
+ return backwardFetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+ }
+
+ private static class TestDriverProducer extends StreamsProducer {
+
+ public TestDriverProducer(final StreamsConfig config,
+ final KafkaClientSupplier clientSupplier,
+ final LogContext logContext) {
+ super(config, "TopologyTestDriver-Thread", clientSupplier, new
TaskId(0, 0), UUID.randomUUID(), logContext);
+ }
+
+ @Override
+ public void commitTransaction(final Map<TopicPartition,
OffsetAndMetadata> offsets,
+ final ConsumerGroupMetadata
consumerGroupMetadata) throws ProducerFencedException {
+ super.commitTransaction(offsets, consumerGroupMetadata);
+ }
+ }
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
deleted file mode 100644
index 94b5c8e..0000000
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
-
-import java.util.List;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-
-public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K,
V> implements KeyValueStore<K, V> {
-
- public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) {
- super(inner);
- }
-
- @Deprecated
- @Override
- public void init(final ProcessorContext context,
- final StateStore root) {
- inner.init(context, root);
- }
-
- @Override
- public void init(final StateStoreContext context, final StateStore root) {
- inner.init(context, root);
- }
-
- @Override
- public void put(final K key,
- final V value) {
- inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP));
- }
-
- @Override
- public V putIfAbsent(final K key,
- final V value) {
- return getValueOrNull(inner.putIfAbsent(key,
ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP)));
- }
-
- @Override
- public void putAll(final List<KeyValue<K, V>> entries) {
- for (final KeyValue<K, V> entry : entries) {
- inner.put(entry.key, ValueAndTimestamp.make(entry.value,
ConsumerRecord.NO_TIMESTAMP));
- }
- }
-
- @Override
- public V delete(final K key) {
- return getValueOrNull(inner.delete(key));
- }
-
- @Override
- public void flush() {
- inner.flush();
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public boolean isOpen() {
- return inner.isOpen();
- }
-}
\ No newline at end of file
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
deleted file mode 100644
index 342817e..0000000
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
-
-import java.time.Instant;
-
-public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V>
implements WindowStore<K, V> {
-
- public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
- super(store);
- }
-
- @Deprecated
- @Override
- public void init(final ProcessorContext context,
- final StateStore root) {
- inner.init(context, root);
- }
-
- @Override
- public void init(final StateStoreContext context, final StateStore root) {
- inner.init(context, root);
- }
-
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP));
- }
-
- @Override
- public void put(final K key,
- final V value,
- final long windowStartTimestamp) {
- inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
- }
-
- @Override
- public WindowStoreIterator<V> backwardFetch(final K key,
- final long timeFrom,
- final long timeTo) {
- return backwardFetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo) {
- return backwardFetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom, final long timeTo) {
- return backwardFetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
- }
-
- @Override
- public void flush() {
- inner.flush();
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public boolean isOpen() {
- return inner.isOpen();
- }
-}
\ No newline at end of file
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java
deleted file mode 100644
index 14e5e135..0000000
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/internals/TestDriverProducer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.KafkaClientSupplier;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.TaskId;
-
-import java.util.Map;
-import java.util.UUID;
-
-public class TestDriverProducer extends StreamsProducer {
-
- public TestDriverProducer(final StreamsConfig config,
- final KafkaClientSupplier clientSupplier,
- final LogContext logContext) {
- super(config, "TopologyTestDriver-Thread", clientSupplier, new
TaskId(0, 0), UUID.randomUUID(), logContext);
- }
-
- @Override
- public void commitTransaction(final Map<TopicPartition, OffsetAndMetadata>
offsets,
- final ConsumerGroupMetadata
consumerGroupMetadata) throws ProducerFencedException {
- super.commitTransaction(offsets, consumerGroupMetadata);
- }
-}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
similarity index 98%
rename from
streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
rename to
streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
index 18652b9..4b45477 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
@@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.internals;
+package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade;
import org.easymock.EasyMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
similarity index 98%
rename from
streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
rename to
streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index 698355d..d184f98 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.internals;
+package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.TopologyTestDriver.WindowStoreFacade;
import org.easymock.EasyMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;