aliehsaeedii commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2921520361
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +48,81 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
- // same as either timestampedStore or versionedStore above. kept merely as
a convenience
- // to simplify implementation for methods which do not depend on store
type.
private StateStore store;
+ @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ final StateStore rawStore = context.getStateStore(storeName);
+
+ // Check if it's an OLD TimestampedKeyValueStore that needs adaptation
+ if (rawStore instanceof TimestampedKeyValueStore &&
+ !(rawStore instanceof TimestampedKeyValueStoreWithHeaders)) {
+ // Adapt OLD store to NEW type for backward compatibility
+ headersStore = new TimestampedKeyValueStoreToHeadersAdapter<>(
Review Comment:
The adapter is needed here for 100% backward compatibility. If the user
specifies the store by `builder.addStateStore(OLD builder)`, the app breaks.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -567,7 +571,7 @@ private <K, V> void
verifyLegacyValuesWithEmptyHeaders(final K key,
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
Review Comment:
Because of the `Readonly*Facade` classes, the
`QueryableStoreTypes.keyValueStore()` does not work here any more. Those
`Facade`s convert the `ValueTimestampHeaders` to plain value and the test fails.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java:
##########
@@ -58,29 +59,22 @@ public WindowStoreMaterializer(
public StoreBuilder<?> builder() {
final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
- ? dslStoreSuppliers().windowStore(new DslWindowParams(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false,
- emitStrategy,
- false,
- storeFormat
- ))
- : (WindowBytesStoreSupplier) materialized.storeSupplier();
-
- final StoreBuilder<?> builder;
- if (storeFormat == DslStoreFormat.HEADERS) {
- builder = Stores.timestampedWindowStoreWithHeadersBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde());
- } else {
- builder = Stores.timestampedWindowStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde());
- }
Review Comment:
I think we keep the supplier as-is, but the builder must always be the new
one!
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +53,80 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
public static final long PUT_RETURN_CODE_IS_LATEST
= VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
- private TimestampedKeyValueStore<K, V> timestampedStore = null;
+ private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
private VersionedKeyValueStore<K, V> versionedStore = null;
Review Comment:
Yes the headers versioned store is not implemented yet. Will be added later.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##########
@@ -109,16 +110,18 @@ private VOut computeValue(final KIn key, final VIn value)
{
return newValue;
}
- private ValueAndTimestamp<VOut> computeValueAndTimestamp(final KIn key,
final ValueAndTimestamp<VIn> valueAndTimestamp) {
+ private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders) {
VOut newValue = null;
long timestamp = 0;
+ Headers headers = null;
- if (valueAndTimestamp != null) {
- newValue = mapper.apply(key, valueAndTimestamp.value());
- timestamp = valueAndTimestamp.timestamp();
+ if (valueTimestampHeaders != null) {
+ newValue = mapper.apply(key, valueTimestampHeaders.value());
+ timestamp = valueTimestampHeaders.timestamp();
+ headers = valueTimestampHeaders.headers();
}
- return ValueAndTimestamp.make(newValue, timestamp);
+ return ValueTimestampHeaders.make(newValue, timestamp, headers);
Review Comment:
I kept it, but should we be consistent?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##########
@@ -94,7 +94,7 @@ public void init(final ProcessorContext<KIn, Change<VAgg>>
context) {
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListener<>(context),
+ new TimestampedCacheFlushListenerWithHeaders<>(context),
Review Comment:
I deleted the class.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java:
##########
@@ -105,297 +101,146 @@ private void mockWindowStoreSupplier() {
}
@Test
- public void
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
+ public void
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateTimestampedBuilderWithCachingDisabled() {
+ public void shouldCreateHeadersBuilderWithCachingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
+ public void shouldCreateHeadersBuilderWithLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
+ public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as(windowStoreSupplier),
nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
final StateStore logging = caching.wrapped();
assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(CachingWindowStore.class, caching);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndLoggingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertInstanceOf(CachingWindowStore.class, caching);
- assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStore);
+ assertFalse(caching.wrapped() instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
+ public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
mockWindowStoreSupplier();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertEquals(innerWindowStore.name(), store.name());
assertFalse(wrapped instanceof CachingWindowStore);
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
- .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStore);
- }
-
- @Test
- public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(CachingWindowStore.class, wrapped);
- }
-
- @Test
- public void
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
- mockWindowStoreSupplier();
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertEquals(innerWindowStore.name(), store.name());
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithCachingAndLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof CachingWindowStore);
assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
}
@Test
- public void shouldCreateTimestampedStoreWithOnWindowClose() {
+ public void shouldCreateHeadersStoreWithOnWindowClose() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
.withCachingDisabled(), nameProvider, STORE_PREFIX);
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
-
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStore.class, store);
- assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class,
logging);
- }
-
- @Test
- public void
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
-
- final TimestampedWindowStore<String, String> store =
getTimestampedStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
- }
-
- @Test
- public void shouldCreateHeadersAwareStoreWithOnWindowClose() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
- new MaterializedInternal<>(Materialized.<String, String,
WindowStore<Bytes, byte[]>>as("store")
- .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
logging);
}
@Test
- public void
shouldCreateHeadersAwareStoreWithOnWindowCloseAndLoggingDisabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
- emitStrategy = EmitStrategy.onWindowClose();
-
- final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized = new MaterializedInternal<>(
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
- );
-
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertFalse(wrapped instanceof
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
- }
-
- @Test
- public void
shouldCreateHeadersAwareStoreWithOnWindowCloseAndCachingEnabled() {
- doReturn("headers")
-
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+ public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+ final TimestampedWindowStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
Review Comment:
I removed the assertion that test passes, I think we need to modify headers
window store for this test to pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]