This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0be98d7e502 MINOR: Add store type (headers, timestamped, KV) downgrade
tests (#21503)
0be98d7e502 is described below
commit 0be98d7e50234252c6e3ef31856d5cb8d3d095fa
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Feb 24 19:31:23 2026 -0800
MINOR: Add store type (headers, timestamped, KV) downgrade tests (#21503)
- Couple of smaller cleanups and adding missing test case.
- Aligning existing test for ts-store to new header-store test.
- Add an optimization for `putIfAbsent` in top level RocksDB store.
- Add new unit and integration tests for downgrading.
Reviewers: Alieh Saeedi <[email protected]>, Bill Bejeck
<[email protected]>
---------
Co-authored-by: Claude <[email protected]>
---
.../integration/StoreUpgradeIntegrationTest.java | 104 +++++++++++++++++++++
.../streams/state/internals/RocksDBStore.java | 36 ++++++-
.../RocksDBTimestampedStoreWithHeaders.java | 8 +-
.../internals/RocksDBTimestampedStoreTest.java | 74 ++++++++++-----
.../RocksDBTimestampedStoreWithHeadersTest.java | 68 ++++++++++++--
5 files changed, 258 insertions(+), 32 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index f0286d4fe93..cb248138d0e 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
@@ -953,6 +954,109 @@ public class StoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
+ @Test
+ public void shouldFailDowngradeFromTimestampedToRegularKeyValueStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateTimestampedStore(props);
+ kafkaStreams = null;
+
+ // Attempt to downgrade to regular key-value store - this should fail
+ final StreamsBuilder streamsBuilderForRegularStore = new
StreamsBuilder();
+
+ streamsBuilderForRegularStore.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.Long()))
+ .<Integer, Integer>stream(inputStream)
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForRegularStore.build(),
props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("timestamped key-value store")
&&
+ cause.getMessage().contains("Downgrade from timestamped to
regular store is not supported")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about downgrade not being supported, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from timestamped to regular store");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedToRegularKeyValueStoreAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateTimestampedStore(props);
+
+ kafkaStreams.cleanUp(); // Delete local state
+ kafkaStreams = null;
+
+ // Now downgrade to regular key-value store - this should succeed
because we cleaned up
+ final StreamsBuilder streamsBuilderForRegularStore = new
StreamsBuilder();
+
+ streamsBuilderForRegularStore.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.Long()))
+ .<Integer, Integer>stream(inputStream)
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForRegularStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyPlainCount(3, asList(
+ KeyValue.pair(1, 1L),
+ KeyValue.pair(2, 1L),
+ KeyValue.pair(3, 1L)));
+
+ kafkaStreams.close();
+ }
+
+ private void setupAndPopulateTimestampedStore(final Properties props)
throws Exception {
+ final StreamsBuilder streamsBuilderForTimestampedStore = new
StreamsBuilder();
+
+ streamsBuilderForTimestampedStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.Long()))
+ .<Integer, Integer>stream(inputStream)
+ .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new
KafkaStreams(streamsBuilderForTimestampedStore.build(), props);
+ kafkaStreams.start();
+
+ final long timestamp1 = CLUSTER.time.milliseconds();
+ processKeyValueAndVerifyCountWithTimestamp(1, timestamp1,
singletonList(
+ KeyValue.pair(1, ValueAndTimestamp.make(1L, timestamp1))));
+
+ final long timestamp2 = CLUSTER.time.milliseconds() + 10;
+ processKeyValueAndVerifyCountWithTimestamp(2, timestamp2, asList(
+ KeyValue.pair(1, ValueAndTimestamp.make(1L, timestamp1)),
+ KeyValue.pair(2, ValueAndTimestamp.make(1L, timestamp2))));
+
+ kafkaStreams.close();
+ }
+
private static class KeyValueProcessor implements Processor<Integer,
Integer, Void, Void> {
private KeyValueStore<Integer, Long> store;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d4705ad62d1..debc2078c62 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -299,6 +300,39 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
try {
final List<byte[]> allExisting =
RocksDB.listColumnFamilies(userSpecifiedOptions, absolutePath);
+ // Check for unexpected column families
+ for (final byte[] existingFamily : allExisting) {
+ final boolean isExpected = allDescriptors.stream()
+ .anyMatch(descriptor ->
Arrays.equals(descriptor.getName(), existingFamily));
+ if (!isExpected) {
+ if (Arrays.equals(existingFamily,
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME)) {
+ throw new ProcessorStateException(
+ "Store " + name + " is a timestamped key-value
store and cannot be opened as a regular key-value store. " +
+ "Downgrade from timestamped to regular store
is not supported directly. " +
+ "To downgrade, you can delete the local state
in the state directory, and rebuild the store as regular key-value store from
the changelog.");
+ }
+ if (Arrays.equals(existingFamily,
RocksDBTimestampedStoreWithHeaders.TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME)) {
+ final boolean openingAsTimestampedStore =
allDescriptors.stream()
+ .anyMatch(descriptor ->
Arrays.equals(descriptor.getName(),
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME));
+ if (openingAsTimestampedStore) {
+ throw new ProcessorStateException(
+ "Store " + name + " is a headers-aware
store and cannot be opened as a timestamped store. " +
+ "Downgrade from headers-aware to
timestamped store is not supported. " +
+ "To downgrade, you can delete the local
state in the state directory, and rebuild the store as timestamped store from
the changelog.");
+ } else {
+ throw new ProcessorStateException(
+ "Store " + name + " is a headers-aware
store and cannot be opened as a regular key-value store. " +
+ "Downgrade from headers-aware to regular
store is not supported.");
+ }
+ }
+
+ final String unexpectedFamily = new String(existingFamily,
StandardCharsets.UTF_8);
+ throw new ProcessorStateException(
+ "Unexpected column family '" + unexpectedFamily +
"' found in store " + name + ". " +
+ "The store may have been created with incompatible
settings.");
+ }
+ }
+
final List<ColumnFamilyDescriptor> existingDescriptors = new
LinkedList<>();
existingDescriptors.add(defaultColumnFamilyDescriptor);
existingDescriptors.addAll(extraDescriptors.stream()
@@ -391,7 +425,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
final byte[] originalValue = get(key);
- if (originalValue == null) {
+ if (originalValue == null && value != null) {
put(key, value);
}
return originalValue;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index 52e4ec68bdb..f451a202168 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -54,7 +54,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME;
- private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+ static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
public RocksDBTimestampedStoreWithHeaders(final String name,
@@ -91,7 +91,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
}
private void openInUpgradeMode(final DBOptions dbOptions,
- final ColumnFamilyOptions
columnFamilyOptions) {
+ final ColumnFamilyOptions
columnFamilyOptions) {
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
dbOptions,
// we have to open the default CF to be able to open the legacy
CF, but we won't use it
@@ -127,7 +127,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
}
private void openInRegularMode(final DBOptions dbOptions,
- final ColumnFamilyOptions columnFamilyOptions) {
+ final ColumnFamilyOptions
columnFamilyOptions) {
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
dbOptions,
// we have to open the default CF to be able to open the legacy
CF, but we won't use it
@@ -143,7 +143,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
}
private void verifyAndCloseEmptyDefaultColumnFamily(final
ColumnFamilyHandle columnFamilyHandle) {
- try (final RocksIterator defaultIter =
db.newIterator(columnFamilyHandle)) {
+ try (columnFamilyHandle; final RocksIterator defaultIter =
db.newIterator(columnFamilyHandle)) {
defaultIter.seekToFirst();
if (defaultIter.isValid()) {
throw new ProcessorStateException("Cannot upgrade directly
from key-value store to headers-aware store for " + name + ". " +
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index c3bb67cd524..2a666a2659b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.hamcrest.core.IsNull;
@@ -42,6 +43,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
@@ -163,34 +165,39 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
// should add new key8 to new CF
- rocksDBStore.put(new Bytes("key8".getBytes()),
"timestamp+88888888".getBytes());
+ rocksDBStore.put(new Bytes("key8new".getBytes()),
"timestamp+88888888".getBytes());
// one delete on old CF, one put on new CF
// approx: 3 entries on old CF, 2 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+ rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+ // one delete on old CF, one put on new CF
+ // approx: 2 entries on old CF, 1 in new CF
+ assertThat(rocksDBStore.approximateNumEntries(), is(3L));
+
// putIfAbsent()
// should migrate key4 from old to new CF with old value
assertThat(rocksDBStore.putIfAbsent(new Bytes("key4".getBytes()),
"timestamp+4444".getBytes()).length, is(8 + 4));
// one delete on old CF, one put on new CF
- // approx: 2 entries on old CF, 3 in new CF
- assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+ // approx: 1 entries on old CF, 2 in new CF
+ assertThat(rocksDBStore.approximateNumEntries(), is(3L));
// should add new key11 to new CF
- assertThat(rocksDBStore.putIfAbsent(new Bytes("key11".getBytes()),
"timestamp+11111111111".getBytes()), new IsNull<>());
+ assertThat(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"timestamp+11111111111".getBytes()), new IsNull<>());
// one delete on old CF, one put on new CF
- // approx: 1 entries on old CF, 4 in new CF
- assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+ // approx: 0 entries on old CF, 3 in new CF
+ assertThat(rocksDBStore.approximateNumEntries(), is(3L));
// should not delete key5 but migrate to new CF
assertThat(rocksDBStore.putIfAbsent(new Bytes("key5".getBytes()),
null).length, is(8 + 5));
// one delete on old CF, one put on new CF
- // approx: 0 entries on old CF, 5 in new CF
- assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+ // approx: 0 entries on old CF, 4 in new CF
+ assertThat(rocksDBStore.approximateNumEntries(), is(4L));
// should be no-op on both CF
- assertThat(rocksDBStore.putIfAbsent(new Bytes("key12".getBytes()),
null), new IsNull<>());
- // two delete operation, however, only one is counted because old CF
count was zero before already
+ assertThat(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null), new IsNull<>());
+ // one delete operation, however, not counted because old CF count was
zero before already
// approx: 0 entries on old CF, 4 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(4L));
@@ -221,7 +228,7 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
- assertArrayEquals("key11".getBytes(), keyValue.key.get());
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't',
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'},
keyValue.value);
}
{
@@ -249,7 +256,7 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
- assertArrayEquals("key8".getBytes(), keyValue.key.get());
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't',
'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
}
assertFalse(itAll.hasNext());
@@ -280,7 +287,7 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
try (final KeyValueIterator<Bytes, byte[]> itAll =
rocksDBStore.reverseAll()) {
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
- assertArrayEquals("key8".getBytes(), keyValue.key.get());
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't',
'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
}
{
@@ -308,7 +315,7 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
- assertArrayEquals("key11".getBytes(), keyValue.key.get());
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't',
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'},
keyValue.value);
}
{
@@ -351,7 +358,7 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
}
{
final KeyValue<Bytes, byte[]> keyValue = it.next();
- assertArrayEquals("key11".getBytes(), keyValue.key.get());
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't',
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'},
keyValue.value);
}
assertFalse(it.hasNext());
@@ -388,9 +395,10 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new
IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new
IsNull<>());
assertThat(db.get(noTimestampColumnFamily,
"key7".getBytes()).length, is(7));
- assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new
IsNull<>());
- assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()),
new IsNull<>());
- assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()),
new IsNull<>());
+ assertThat(db.get(noTimestampColumnFamily, "key8new".getBytes()),
new IsNull<>());
+ assertThat(db.get(noTimestampColumnFamily, "key9new".getBytes()),
new IsNull<>());
+ assertThat(db.get(noTimestampColumnFamily, "key11new".getBytes()),
new IsNull<>());
+ assertThat(db.get(noTimestampColumnFamily, "key12new".getBytes()),
new IsNull<>());
assertThat(db.get(withTimestampColumnFamily,
"unknown".getBytes()), new IsNull<>());
assertThat(db.get(withTimestampColumnFamily,
"key1".getBytes()).length, is(8 + 1));
@@ -400,9 +408,10 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
assertThat(db.get(withTimestampColumnFamily,
"key5".getBytes()).length, is(8 + 5));
assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()),
new IsNull<>());
assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()),
new IsNull<>());
- assertThat(db.get(withTimestampColumnFamily,
"key8".getBytes()).length, is(18));
- assertThat(db.get(withTimestampColumnFamily,
"key11".getBytes()).length, is(21));
- assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()),
new IsNull<>());
+ assertThat(db.get(withTimestampColumnFamily,
"key8new".getBytes()).length, is(18));
+ assertThat(db.get(noTimestampColumnFamily, "key9new".getBytes()),
new IsNull<>());
+ assertThat(db.get(withTimestampColumnFamily,
"key11new".getBytes()).length, is(21));
+ assertThat(db.get(withTimestampColumnFamily,
"key12new".getBytes()), new IsNull<>());
} catch (final RuntimeException fatal) {
errorOccurred = true;
} finally {
@@ -464,6 +473,29 @@ public class RocksDBTimestampedStoreTest extends
RocksDBStoreTest {
}
}
+ @Test
+ public void shouldNotSupportDowngradeFromTimestampedToPlainKeyValueStore()
{
+ rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.put(new Bytes("key1".getBytes()),
"timestamped-value1".getBytes());
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"timestamped-value2".getBytes());
+ rocksDBStore.close();
+
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ try {
+ final ProcessorStateException exception = assertThrows(
+ ProcessorStateException.class,
+ () -> kvStore.init(context, kvStore)
+ );
+
+ assertThat(exception.getMessage(), is(
+ "Store " + DB_NAME + " is a timestamped key-value store and
cannot be opened as a regular key-value store. " +
+ "Downgrade from timestamped to regular store is not supported
directly. " +
+ "To downgrade, you can delete the local state in the state
directory, and rebuild the store as regular key-value store from the
changelog."));
+ } finally {
+ kvStore.close();
+ }
+ }
+
private void prepareOldStore() {
final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME,
METRICS_SCOPE);
try {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index 98a74df76c4..ab863d4e5fc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -39,6 +39,7 @@ import org.rocksdb.RocksIterator;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
@@ -177,19 +178,24 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
// one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on legacy CF, 2 in headers-aware CF after adding new key8new with
put()");
+ rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+ // one delete on old CF, one put on new CF, but count is off by two
due to deletes not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on legacy CF, 1 in headers-aware CF after adding new key8new with
put()");
+
// putIfAbsent() - tests migration on conditional write
assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"headers+timestamp+11111111111".getBytes()),
"Expected null return value for putIfAbsent on non-existing
key11new, and new key should be added to headers-aware CF");
// one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
- assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on legacy CF, 3 in headers-aware CF after adding new key11new with
putIfAbsent()");
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 1
entries on legacy CF, 2 in headers-aware CF after adding new key11new with
putIfAbsent()");
assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null).length,
"Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
- assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 1
entry on legacy CF, 4 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
+ // one delete on old CF, one put on new CF, due to `get()` migration
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 0
entry on legacy CF, 3 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
- // two delete operation, however, only one is counted because old CF
count can not be less than 0
+ // no delete operation, because key12new is unknown
assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 0
entries on legacy CF, 3 in headers-aware CF after putIfAbsent with null on
non-existing key12new");
// delete() - tests migration on delete
@@ -410,7 +416,10 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
assertNull(db.get(legacyTimestampedColumnFamily, "key6".getBytes()));
// migrated
assertEquals(8 + 7, db.get(legacyTimestampedColumnFamily,
"key7".getBytes()).length); // not migrated
assertNull(db.get(legacyTimestampedColumnFamily,
"key8new".getBytes()));
+ assertNull(db.get(legacyTimestampedColumnFamily,
"key9new".getBytes()));
assertNull(db.get(legacyTimestampedColumnFamily,
"key11new".getBytes()));
+ assertNull(db.get(legacyTimestampedColumnFamily,
"key12new".getBytes()));
+
}
private void verifyHeadersColumnFamily(final RocksDB db, final
ColumnFamilyHandle headersColumnFamily) throws Exception {
@@ -424,6 +433,7 @@ public class RocksDBTimestampedStoreWithHeadersTest extends
RocksDBStoreTest {
assertNull(db.get(headersColumnFamily, "key6".getBytes())); //
migrated by delete() => deleted
assertNull(db.get(headersColumnFamily, "key7".getBytes())); // not
migrated, should still be in legacy column family
assertEquals("headers+timestamp+88888888".getBytes().length,
db.get(headersColumnFamily, "key8new".getBytes()).length); // added by put() =>
value is inserted without any conversion
+ assertNull(db.get(headersColumnFamily, "key9new".getBytes()));
assertEquals("headers+timestamp+11111111111".getBytes().length,
db.get(headersColumnFamily, "key11new".getBytes()).length); // inserted (newly
added) by putIfAbsent() => value is inserted without any conversion
assertNull(db.get(headersColumnFamily, "key12new".getBytes())); //
putIfAbsent with null value on non-existing key should not create any entry
}
@@ -525,11 +535,11 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
boolean hasLegacy = false;
for (final byte[] cf : existingCFs) {
- if (java.util.Arrays.equals(cf,
RocksDB.DEFAULT_COLUMN_FAMILY)) {
+ if (Arrays.equals(cf, RocksDB.DEFAULT_COLUMN_FAMILY)) {
hasDefault = true;
- } else if (java.util.Arrays.equals(cf,
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8))) {
+ } else if (Arrays.equals(cf,
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8))) {
hasHeadersAware = true;
- } else if (java.util.Arrays.equals(cf,
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8))) {
+ } else if (Arrays.equals(cf,
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8))) {
hasLegacy = true;
}
}
@@ -549,6 +559,52 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
}
+ @Test
+ public void shouldNotSupportDowngradeFromHeadersAwareToRegularStore() {
+ // prepare headers-aware store with data
+ rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.put(new Bytes("key1".getBytes()),
"headers-aware-value1".getBytes());
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers-aware-value2".getBytes());
+ rocksDBStore.close();
+
+ final RocksDBStore regularStore = new RocksDBStore(DB_NAME,
METRICS_SCOPE);
+ try {
+ final ProcessorStateException exception = assertThrows(
+ ProcessorStateException.class,
+ () -> regularStore.init(context, regularStore)
+ );
+
+ assertTrue(exception.getMessage().contains("Store " + DB_NAME + "
is a headers-aware store"));
+ assertTrue(exception.getMessage().contains("cannot be opened as a
regular key-value store"));
+ assertTrue(exception.getMessage().contains("Downgrade from
headers-aware to regular store is not supported"));
+ } finally {
+ regularStore.close();
+ }
+ }
+
+ @Test
+ public void shouldNotSupportDowngradeFromHeadersAwareToTimestampedStore() {
+ rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.put(new Bytes("key1".getBytes()),
"headers-aware-value1".getBytes());
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers-aware-value2".getBytes());
+ rocksDBStore.close();
+
+ final RocksDBTimestampedStore timestampedStore = new
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
+ try {
+ final ProcessorStateException exception = assertThrows(
+ ProcessorStateException.class,
+ () -> timestampedStore.init(context, timestampedStore)
+ );
+
+ assertTrue(exception.getMessage().contains("Store " + DB_NAME + "
is a headers-aware store"));
+ assertTrue(exception.getMessage().contains("cannot be opened as a
timestamped store"));
+ assertTrue(exception.getMessage().contains("Downgrade from
headers-aware to timestamped store is not supported"));
+ assertTrue(exception.getMessage().contains("To downgrade, you can
delete the local state in the state directory, and rebuild the store as
timestamped store from the changelog"));
+ } finally {
+ timestampedStore.close();
+ }
+ }
+
private void prepareKeyValueStore() {
// Create a plain RocksDBStore (key-value, not timestamped) with data
in default column family
final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);