vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761510761



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +121,28 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = 
stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {
+            
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_DEFAULT);

Review comment:
       Following on the last comment (and the earlier one), I don't think we 
need the "default" version, just the "consistency" one.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception 
{
+        final int batch1NumMessages = 100;

Review comment:
       It seems like this test is just as valid with one message as with 100. 
I'm concerned that we might have a flaky test if the standby has only restored 
like 50 of the messages when we do that last assertion. It looks like all the 
safety checks before that would pass, but there's nothing that guarantees we 
have read the entire batch into the standby before we assert the position.
   
   OTOH, if we just write a single record, then it's either there or it's not.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -216,6 +226,15 @@ public void put(final Bytes key,
             expiredRecordSensor.record(1.0d, 
ProcessorContextUtils.currentSystemTime(context));
             LOG.warn("Skipping record for expired segment.");
         } else {
+            try {
+                final InternalProcessorContext internalContext = 
asInternalProcessorContext(context);

Review comment:
       This is to expose `recordMetadata`, right? We should actually be able to 
migrate this class to `StateStoreContext` now and not need this cast. (The cast 
would probably break users' unit tests).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -138,8 +153,12 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
         return wrapped().reverseAll();
     }
 
-    void log(final Bytes key,
-             final byte[] value) {
-        context.logChange(name(), key, value, context.timestamp());
+    @SuppressWarnings("unchecked")
+    void log(final Bytes key, final byte[] value) {
+        Optional<Position> optionalPosition = Optional.empty();
+        if (consistencyEnabled) {
+            optionalPosition = Optional.of(position);
+        }

Review comment:
       Since the processor context also checks whether the feature is enabled 
before writing headers, we can just skip this check and instead pass the 
position unconditionally.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
##########
@@ -27,6 +30,8 @@
 import java.util.function.BiConsumer;
 
 public class Position {
+    public static final String VECTOR_KEY = "c";

Review comment:
       We'd better put all the changelog headers constants together in the same 
file to avoid collisions if we add more headers later.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version 
control.
+ * Version 1: This indicates that the changelog records have consistency 
information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 
0};

Review comment:
       I think we needed both versions before, when we were thinking about 
changing the key format of the changelog, but now I think we just need one 
version. Offhand, it seems like the algorithm should be:
   
   ```
   versionHeader = record.headers().lastHeader(
                   
ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_KEY);
   if (versionHeader == null) {
     return
   } else {
     switch (versionHeader.value()[0]) {
       case 0:
         vectorHeader = record.headers().lastHeader(Position.VECTOR_KEY);
         if (vectorHeader == null) {
           throw new StreamsException("This should not happen. Consistency is 
enabled but the changelog " +
             "contains records without consistency information.");
           }
         
position.merge(Position.deserialize(ByteBuffer.wrap(vectorHeader.value())));
       default:
         // log a warning because the changelog writer produced a record with a 
newer version than we understand
         // maybe we want to zero out the position, since we no longer know 
what position we're at
         // we probably don't want to throw an exception, since we are 
presumably in the middle of a rolling upgrade
         return;
     }
   }
   ```
   
   In other words, I don't think the consumer actually needs to check 
`consistencyEnabled` at all, and I don't think that we currently need the 
`CHANGELOG_VERSION_HEADER_RECORD_DEFAULT` version that's here right now.
   
   But we do actually need to have some provision for what to do if we get a 
version that's larger than the ones we know how to handle.
   
   What do you think?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -106,10 +111,13 @@
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    // VisibleForTesting
+    protected Position position;
+
     private StateStoreContext context;
-    private Position position;
 
-    RocksDBStore(final String name,
+    // VisibleForTesting
+    public RocksDBStore(final String name,

Review comment:
       I see. Once the framework PR is merged, we'll be able to get the 
position through the public API, and won't need this anymore. I think your PR 
will get merged first, so let's plan to circle back and revert some of this 
stuff after the public API is in.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -384,12 +387,14 @@ public void localSessionStoreShouldNotAllowInitOrClose() {
     }
 
     @Test
-    public void shouldNotSendRecordHeadersToChangelogTopic() {
+    public void shouldSendV0RecordHeadersToChangelogTopic() {

Review comment:
       And we probably want to add a new test that we write the consistency 
header if the flag is enabled, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
##########
@@ -79,15 +91,34 @@ public void init(final StateStoreContext context, final 
StateStore root) {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void remove(final Windowed<Bytes> sessionKey) {
+        if (context.recordMetadata().isPresent()) {
+            final RecordMetadata meta = context.recordMetadata().get();
+            position = position.update(meta.topic(), meta.partition(), 
meta.offset());
+        }
         wrapped().remove(sessionKey);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, 
context.timestamp());
+        Optional<Position> optionalPosition = Optional.empty();
+        if (consistencyEnabled) {
+            optionalPosition = Optional.of(position);
+        }
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, 
context.timestamp(), optionalPosition);
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+        if (context.recordMetadata().isPresent()) {
+            final RecordMetadata meta = context.recordMetadata().get();
+            position = position.update(meta.topic(), meta.partition(), 
meta.offset());
+        }
         wrapped().put(sessionKey, aggregate);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), 
aggregate, context.timestamp());
+        Optional<Position> optionalPosition = Optional.empty();
+        if (consistencyEnabled) {
+            optionalPosition = Optional.of(position);
+        }
+        context.logChange(
+                name(), SessionKeySchema.toBinary(sessionKey), aggregate, 
context.timestamp(), optionalPosition);

Review comment:
       Yeah, I think these will all become just one-line changes.
   
   ```suggestion
           context.logChange(name(), SessionKeySchema.toBinary(sessionKey), 
aggregate, context.timestamp(), position);
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -76,6 +87,10 @@ public long approximateNumEntries() {
     public void put(final Bytes key,
                     final byte[] value) {
         wrapped().put(key, value);
+        if (context != null && context.recordMetadata().isPresent()) {

Review comment:
       Yeah, I do think it's easier to just have it everywhere. In the "real" 
PR, I wrapped this logic up in a utility method: 
https://github.com/apache/kafka/pull/11557/files#diff-d0743c083f89b083921f29f722b02676d55e0d602fef06954e7301afc19d1df3R56-R64
   
   The only reason it might be null is in unit tests, but we do need to support 
unit tests, since our users will have their own unit tests of their processors, 
which might have stores.
   
   I think it's obvious enough that you need to provide a context with record 
metadata if you want position tracking that I feel ok about just skipping the 
position update if the context is null or the record metadata is not present.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -947,6 +947,11 @@
         // Private API used to control the emit latency for left/outer join 
results (https://issues.apache.org/jira/browse/KAFKA-10847)
         public static final String 
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = 
"__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
 
+        // Private API used to control the usage of consistency offset vectors
+        public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = 
"__iq.consistency.offset"

Review comment:
       Thanks! I totally forgot about this while writing the KIP because I was 
so focused on the API. I think we'll need a public feature flag so that people 
can disable the feature if they're running on older brokers that don't support 
headers.
   
   Let's keep this for now to keep the review cycle efficient on your PR. I'll 
amend the KIP, and we can follow up with a public config.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception 
{
+        final int batch1NumMessages = 100;

Review comment:
       If you want to write 100 messages, we could also get a valid check by 
giving each record a different key or value and then waiting until we see all 
the keys (or values) before we assert the position.
   
   Then again, once the framework PR is in place, we could instead just use 
IQv2 to repeatedly query both instances and verify that we eventually get the 
right position back for both. This is another thing we should plan to circle 
back and refactor once that work is merged.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -384,12 +387,14 @@ public void localSessionStoreShouldNotAllowInitOrClose() {
     }
 
     @Test
-    public void shouldNotSendRecordHeadersToChangelogTopic() {
+    public void shouldSendV0RecordHeadersToChangelogTopic() {

Review comment:
       We'll want to restore this test back to exactly the old version of this 
diff, to be sure we don't have a regression on older brokers.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +120,26 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = 
stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {

Review comment:
       I see the value in setting this precedent, but unfortunately, we can't 
do it, due to the need to continue supporting older brokers (we support older 
versions that don't allow record headers). Instead, if `!consistencyEnabled`, 
we should just not add headers at all (i.e., we should continue to pass `null` 
as the headers).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -312,6 +318,11 @@ public synchronized void put(final Bytes key,
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         try (final WriteBatch batch = new WriteBatch()) {
             dbAccessor.prepareBatch(entries, batch);
+            // FIXME Will the recordMetadata be the offset of the last record 
in the batch?

Review comment:
       I think that in reality, we only call this method during restoration, in 
which case `context.recordMetadata` will be absent (and the position is updated 
in the RecordBatching restore callback).
   
   In theory, though, it doesn't really matter what is going on when some 
processor calls one or more state store methods. The processor is always at a 
specific position in its input topic-parititon(s), and that's the position 
component that we should update.
   
   The only contract that the store needs to maintain is that it updates the 
position on every mutation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to