This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 93dca8e KAFKA-12749: Changelog topic config on suppressed KTable lost
(#10664)
93dca8e is described below
commit 93dca8ebd973f7a49280aa216137f685c00030e3
Author: Viswanathan Ranganathan <[email protected]>
AuthorDate: Thu Jun 3 11:00:19 2021 -0700
KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)
Refactored logConfig to be passed appropriately when using shutDownWhenFull
or emitEarlyWhenFull. Removed the constructor that doesn't accept a logConfig
parameter so you're forced to specify it explicitly, whether it's
empty/unspecified or not.
Co-authored-by: Bruno Cadonna <[email protected]>
Reviewers: Walker Carlson <[email protected]>, Bruno Cadonna
<[email protected]>
---
.../apache/kafka/streams/kstream/Suppressed.java | 5 +-
.../internals/suppress/BufferConfigInternal.java | 7 +-
.../internals/suppress/EagerBufferConfigImpl.java | 22 +++--
.../internals/suppress/StrictBufferConfigImpl.java | 21 ++---
.../kafka/streams/kstream/SuppressedTest.java | 93 ++++++++++++++++++++--
5 files changed, 111 insertions(+), 37 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 2f96993..31a53ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -22,6 +22,7 @@ import
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImp
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import java.time.Duration;
+import java.util.Collections;
import java.util.Map;
public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
@@ -48,7 +49,7 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
* Create a size-constrained buffer in terms of the maximum number of
keys it will store.
*/
static EagerBufferConfig maxRecords(final long recordLimit) {
- return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
+ return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE,
Collections.emptyMap());
}
/**
@@ -60,7 +61,7 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
* Create a size-constrained buffer in terms of the maximum number of
bytes it will use.
*/
static EagerBufferConfig maxBytes(final long byteLimit) {
- return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
+ return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit,
Collections.emptyMap());
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 74de6ef..800a2a5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -35,18 +35,19 @@ public abstract class BufferConfigInternal<BC extends
Suppressed.BufferConfig<BC
return new StrictBufferConfigImpl(
Long.MAX_VALUE,
Long.MAX_VALUE,
- SHUT_DOWN // doesn't matter, given the bounds
+ SHUT_DOWN, // doesn't matter, given the bounds
+ getLogConfig()
);
}
@Override
public Suppressed.StrictBufferConfig shutDownWhenFull() {
- return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN);
+ return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN,
getLogConfig());
}
@Override
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
- return new EagerBufferConfigImpl(maxRecords(), maxBytes());
+ return new EagerBufferConfigImpl(maxRecords(), maxBytes(),
getLogConfig());
}
public abstract boolean isLoggingEnabled();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index c56532d..7665e66 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -28,15 +28,9 @@ public class EagerBufferConfigImpl extends
BufferConfigInternal<Suppressed.Eager
private final long maxBytes;
private final Map<String, String> logConfig;
- public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
- this.maxRecords = maxRecords;
- this.maxBytes = maxBytes;
- this.logConfig = Collections.emptyMap();
- }
-
- private EagerBufferConfigImpl(final long maxRecords,
- final long maxBytes,
- final Map<String, String> logConfig) {
+ public EagerBufferConfigImpl(final long maxRecords,
+ final long maxBytes,
+ final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = logConfig;
@@ -97,16 +91,20 @@ public class EagerBufferConfigImpl extends
BufferConfigInternal<Suppressed.Eager
}
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
- maxBytes == that.maxBytes;
+ maxBytes == that.maxBytes &&
+ Objects.equals(getLogConfig(), that.getLogConfig());
}
@Override
public int hashCode() {
- return Objects.hash(maxRecords, maxBytes);
+ return Objects.hash(maxRecords, maxBytes, getLogConfig());
}
@Override
public String toString() {
- return "EagerBufferConfigImpl{maxRecords=" + maxRecords + ",
maxBytes=" + maxBytes + '}';
+ return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
+ ", maxBytes=" + maxBytes +
+ ", logConfig=" + getLogConfig() +
+ "}";
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 13ffccd..2ca5ef9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -41,14 +41,6 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
this.logConfig = logConfig;
}
- public StrictBufferConfigImpl(final long maxRecords,
- final long maxBytes,
- final BufferFullStrategy bufferFullStrategy)
{
- this.maxRecords = maxRecords;
- this.maxBytes = maxBytes;
- this.bufferFullStrategy = bufferFullStrategy;
- this.logConfig = Collections.emptyMap();
- }
public StrictBufferConfigImpl() {
this.maxRecords = Long.MAX_VALUE;
@@ -59,12 +51,12 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
@Override
public Suppressed.StrictBufferConfig withMaxRecords(final long
recordLimit) {
- return new StrictBufferConfigImpl(recordLimit, maxBytes,
bufferFullStrategy);
+ return new StrictBufferConfigImpl(recordLimit, maxBytes,
bufferFullStrategy, getLogConfig());
}
@Override
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
- return new StrictBufferConfigImpl(maxRecords, byteLimit,
bufferFullStrategy);
+ return new StrictBufferConfigImpl(maxRecords, byteLimit,
bufferFullStrategy, getLogConfig());
}
@Override
@@ -113,18 +105,21 @@ public class StrictBufferConfigImpl extends
BufferConfigInternal<Suppressed.Stri
final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes &&
- bufferFullStrategy == that.bufferFullStrategy;
+ bufferFullStrategy == that.bufferFullStrategy &&
+ Objects.equals(getLogConfig(), ((StrictBufferConfigImpl)
o).getLogConfig());
}
@Override
public int hashCode() {
- return Objects.hash(maxRecords, maxBytes, bufferFullStrategy);
+ return Objects.hash(maxRecords, maxBytes, bufferFullStrategy,
getLogConfig());
}
@Override
public String toString() {
return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
", maxBytes=" + maxBytes +
- ", bufferFullStrategy=" + bufferFullStrategy + '}';
+ ", bufferFullStrategy=" + bufferFullStrategy +
+ ", logConfig=" + getLogConfig().toString() +
+ '}';
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
index 112b9eb..b799884 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -22,6 +22,8 @@ import
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImp
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.junit.Test;
+import java.util.Collections;
+
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
import static
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
@@ -46,13 +48,19 @@ public class SuppressedTest {
assertThat(
"keys alone should be set",
maxRecords(2L),
- is(new EagerBufferConfigImpl(2L, MAX_VALUE))
+ is(new EagerBufferConfigImpl(2L, MAX_VALUE,
Collections.emptyMap()))
);
assertThat(
"size alone should be set",
maxBytes(2L),
- is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
+ is(new EagerBufferConfigImpl(MAX_VALUE, 2L,
Collections.emptyMap()))
+ );
+
+ assertThat(
+ "config should be set even after max records",
+
maxRecords(2L).withMaxBytes(4L).withLoggingEnabled(Collections.singletonMap("myConfigKey",
"myConfigValue")),
+ is(new EagerBufferConfigImpl(2L, 4L,
Collections.singletonMap("myConfigKey", "myConfigValue")))
);
}
@@ -91,7 +99,13 @@ public class SuppressedTest {
assertThat(
"all constraints should be set",
untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
- is(new SuppressedInternal<>(null, ofMillis(2), new
EagerBufferConfigImpl(3L, 2L), null, false))
+ is(new SuppressedInternal<>(null, ofMillis(2), new
EagerBufferConfigImpl(3L, 2L, Collections.emptyMap()), null, false))
+ );
+
+ assertThat(
+ "config is not lost early emit is set",
+ untilTimeLimit(ofMillis(2),
maxRecords(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey",
"myConfigValue")).emitEarlyWhenFull()),
+ is(new SuppressedInternal<>(null, ofMillis(2), new
EagerBufferConfigImpl(2L, MAX_VALUE, Collections.singletonMap("myConfigKey",
"myConfigValue")), null, false))
);
}
@@ -105,13 +119,13 @@ public class SuppressedTest {
assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
- is(new FinalResultsSuppressionBuilder<>(null, new
StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>(null, new
StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
)
);
assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
- is(new FinalResultsSuppressionBuilder<>(null, new
StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>(null, new
StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
)
);
@@ -122,14 +136,79 @@ public class SuppressedTest {
assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()).withName("name"),
- is(new FinalResultsSuppressionBuilder<>("name", new
StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>("name", new
StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
)
);
assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()).withName("name"),
- is(new FinalResultsSuppressionBuilder<>("name", new
StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+ is(new FinalResultsSuppressionBuilder<>("name", new
StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
)
);
+
+ assertThat(
+ "config is not lost when shutdown when full is set",
+
untilWindowCloses(maxBytes(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey",
"myConfigValue")).shutDownWhenFull()),
+ is(new FinalResultsSuppressionBuilder<>(null, new
StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN,
Collections.singletonMap("myConfigKey", "myConfigValue"))))
+ );
+ }
+
+ @Test
+ public void supportLongChainOfMethods() {
+ final Suppressed.BufferConfig<Suppressed.EagerBufferConfig>
bufferConfig = unbounded()
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L);
+
+ assertThat(
+ "long chain of eager buffer config sets attributes properly",
+ bufferConfig,
+ is(new EagerBufferConfigImpl(5L, 6L, Collections.emptyMap()))
+ );
+ assertThat(
+ "long chain of strict buffer config sets attributes properly",
+ bufferConfig.shutDownWhenFull(),
+ is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN,
Collections.emptyMap()))
+ );
+
+ final Suppressed.BufferConfig<Suppressed.EagerBufferConfig>
bufferConfigWithLogging = unbounded()
+ .withLoggingEnabled(Collections.singletonMap("myConfigKey",
"myConfigValue"))
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L);
+
+ assertThat(
+ "long chain of eager buffer config sets attributes properly with
logging enabled",
+ bufferConfigWithLogging,
+ is(new EagerBufferConfigImpl(5L, 6L,
Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
+ assertThat(
+ "long chain of strict buffer config sets attributes properly with
logging enabled",
+ bufferConfigWithLogging.shutDownWhenFull(),
+ is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN,
Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
+
+ final Suppressed.BufferConfig<Suppressed.EagerBufferConfig>
bufferConfigWithLoggingCalledAtTheEnd = unbounded()
+ .emitEarlyWhenFull()
+ .withMaxRecords(3L)
+ .withMaxBytes(4L)
+ .withMaxRecords(5L)
+ .withMaxBytes(6L)
+ .withLoggingEnabled(Collections.singletonMap("myConfigKey",
"myConfigValue"));
+
+ assertThat(
+ "long chain of eager buffer config sets logging even after other
setters",
+ bufferConfigWithLoggingCalledAtTheEnd,
+ is(new EagerBufferConfigImpl(5L, 6L,
Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
+ assertThat(
+ "long chain of strict buffer config sets logging even after other
setters",
+ bufferConfigWithLoggingCalledAtTheEnd.shutDownWhenFull(),
+ is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN,
Collections.singletonMap("myConfigKey", "myConfigValue")))
+ );
}
}