vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755098773



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +120,26 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = 
stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {
+            
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_DEFAULT);
+        } else {
+            // Add the vector clock to the header part of every record

Review comment:
       We know this will lead to inefficiencies when there are multiple topics 
with long names. One solution is to map the topic names to bytes to save space. 
The other option is to not write the entire vector clock with every record. The 
latter approach is under design




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to