guozhangwang commented on a change in pull request #8103:
URL: https://github.com/apache/kafka/pull/8103#discussion_r415156119



##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -109,6 +109,19 @@
     public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a 
message will remain " +
         "ineligible for compaction in the log. Only applicable for logs that 
are being compacted.";
 
+    public static final String COMPACTION_STRATEGY_CONFIG = 
"compaction.strategy";
+    public static final String COMPACTION_STRATEGY_DOC = "The retention 
strategy to use when compacting the log. " +
+       "Only applicable for logs that are being compacted. " + 
+       "By default the compaction strategy is set to \"offset\" where the 
latest offset for the key is retained. " + 
+       "For \"header\" strategy, the value provided by the producer in the 
record header will be used to determine " +
+       "the latest record for the key. For \"timestamp\" strategy, the record 
tiemstamp will be used to determine the " +
+       "latest record for the key. So setting the strategy to anything other 
than \"offset\" will replace the offset " +
+       "when calculating which records to retain for the value (i.e. provided 
by the producer) matching " + 
+       "the given strategy name (case-insensitive). The valid strategies are 
\"offset\", \"timestamp\" and \"header\".";
+
+    public static final String COMPACTION_STRATEGY_HEADER_KEY_CONFIG = 
"compaction.strategy.header";
+    public static final String COMPACTION_STRATEGY_HEADER_KEY_DOC = "The 
header key for the compaction. Only applicable for compaction strategy header.";

Review comment:
       nit: ... for "header" compaction strategy.

##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -109,6 +109,19 @@
     public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a 
message will remain " +
         "ineligible for compaction in the log. Only applicable for logs that 
are being compacted.";
 
+    public static final String COMPACTION_STRATEGY_CONFIG = 
"compaction.strategy";
+    public static final String COMPACTION_STRATEGY_DOC = "The retention 
strategy to use when compacting the log. " +
+       "Only applicable for logs that are being compacted. " + 
+       "By default the compaction strategy is set to \"offset\" where the 
latest offset for the key is retained. " + 
+       "For \"header\" strategy, the value provided by the producer in the 
record header will be used to determine " +
+       "the latest record for the key. For \"timestamp\" strategy, the record 
tiemstamp will be used to determine the " +
+       "latest record for the key. So setting the strategy to anything other 
than \"offset\" will replace the offset " +

Review comment:
       git: The sentence "So setting the strategy ... matching the given 
strategy name" reads a bit confusing here. I think we only need to state that 
when the change of the policy would take effects (the next time when compaction 
is triggered by the log cleaner thread), and emphasize that for "timestamp" and 
"header" we would always still retain the last record.

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
    * The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
    */
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize
+
   /**
    * The maximum number of entries this map can contain
    */
-  val slots: Int = memory / bytesPerEntry
+  var slots: Int = memory / bytesPerEntry
+
+  /* compact strategy */
+  private var compactionStrategy: CompactionStrategy = null
+
+  /* header key for the Strategy header to look for */
+  private var headerKey: String = ""
+
+  /**
+   * Initialize the map with the topic compact strategy
+   * @param strategy The compaction strategy
+   * @param headerKey The header key if the compaction strategy is set to 
header
+   * @param cleanerThreadId The clenaer thread id
+   * @param topicPartitionName The topic partition name
+   */
+  override def init(strategy: String = Defaults.CompactionStrategy, headerKey: 
String = "", cleanerThreadId: Int = -1, topicPartitionName: String = "") {
+    // set the log indent for the topic partition
+    this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+    info(s"Initializing OffsetMap with compaction strategy '$strategy' and 
header key '$headerKey'")
+
+    // Change the salt used for key hashing making all existing keys 
unfindable.
+    this.entries = 0
+    this.lookups = 0L
+    this.probes = 0L
+    this.lastOffset = -1L
+    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + 
bytes.limit(), 0.toByte)
+
+    this.compactionStrategy = CompactionStrategy.withName(strategy)
+    this.headerKey = headerKey.trim()
+
+    info(s"Compaction strategy set to '${this.compactionStrategy}'")
+    this.bytesPerEntry = hashSize + longByteSize + (if 
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+    this.slots = memory / bytesPerEntry
+  }
   
   /**
    * Associate this offset to the given key.
-   * @param key The key
-   * @param offset The offset
+   * @param record The record
+   * @return success flag
    */
-  override def put(key: ByteBuffer, offset: Long): Unit = {
+  override def put(record: Record): Boolean = {
     require(entries < slots, "Attempt to add a new entry to a full offset 
map.")
+
+    val key = record.key
+    val offset = record.offset
+    val currVersion = extractVersion(record)
+
+    lastOffset = offset
     lookups += 1
     hashInto(key, hash1)
+
     // probe until we find the first empty slot
     var attempt = 0
     var pos = positionOf(hash1, attempt)  
-    while(!isEmpty(pos)) {
+    while (!isEmpty(pos)) {
       bytes.position(pos)
       bytes.get(hash2)
-      if(Arrays.equals(hash1, hash2)) {
+      if (Arrays.equals(hash1, hash2)) {
+        // we found an existing entry, overwrite it and return (size does not 
change)
+        if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+          // read previous value by skipping offset
+          bytes.position(bytes.position() + longByteSize)
+          val foundVersion = bytes.getLong()
+          if (foundVersion > currVersion) {
+            // map already holding latest record
+            return false
+          }
+
+          // reset the position to start of offset
+          bytes.position(bytes.position() - (2*longByteSize))
+        }
+
         // we found an existing entry, overwrite it and return (size does not 
change)
         bytes.putLong(offset)
-        lastOffset = offset
-        return
+        if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+          bytes.putLong(currVersion)
+        }
+
+        return true
       }
+
       attempt += 1
       pos = positionOf(hash1, attempt)
     }
+
     // found an empty slot, update it--size grows by 1
     bytes.position(pos)
     bytes.put(hash1)
     bytes.putLong(offset)
-    lastOffset = offset
+    if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+      bytes.putLong(currVersion)
+    }
+
     entries += 1
+    true
   }
-  
+
   /**
    * Check that there is no entry at the given position
    */
   private def isEmpty(position: Int): Boolean = 
-    bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && 
bytes.getLong(position + 16) == 0
+    bytes.getLong(position) == 0 && bytes.getLong(position + longByteSize) == 
0 && bytes.getLong(position + (2*longByteSize)) == 0
 
   /**
-   * Get the offset associated with this key.
+   * Checks to see whether to retain the record or not
+   * @param record The record
+   * @return true to retain; false not to
+   */
+  override def shouldRetainRecord(record: Record): Boolean = {
+    if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+      val foundVersion = getVersion(record.key)
+      val currentVersion = extractVersion(record)
+      // use version if available & different otherwise fallback to offset

Review comment:
       +1. I think the LEO case should be captured in this function as well.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -1725,26 +2360,64 @@ class LogCleanerTest {
 }
 
 class FakeOffsetMap(val slots: Int) extends OffsetMap {
-  val map = new java.util.HashMap[String, Long]()
+  val map = new java.util.HashMap[String, Record]()
   var lastOffset = -1L
+  var isOffsetStrategy = true
+  var isTimestampStrategy = false
+  var headerKey = ""
 
   private def keyFor(key: ByteBuffer) =
     new String(Utils.readBytes(key.duplicate), StandardCharsets.UTF_8)
 
-  override def put(key: ByteBuffer, offset: Long): Unit = {
-    lastOffset = offset
-    map.put(keyFor(key), offset)
+  override def init(strategy: String = Defaults.CompactionStrategyOffset, 
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String = 
"") = {
+    this.map.clear()
+
+    this.isOffsetStrategy = 
Defaults.CompactionStrategyOffset.equalsIgnoreCase(strategy)
+    this.isTimestampStrategy = 
Defaults.CompactionStrategyTimestamp.equalsIgnoreCase(strategy)
+    this.headerKey = headerKey
   }
 
-  override def get(key: ByteBuffer): Long = {
+  override def put(record: Record): Boolean = {
+    lastOffset = record.offset
+    extractVersion(record)
+    val key = keyFor(record.key)
+    if (!isOffsetStrategy && map.containsKey(key)) {
+      val foundVersion = extractVersion(map.get(key))
+      val currVersion = extractVersion(record)
+      if (foundVersion > currVersion)
+        return false
+    }
+    map.put(key, record)
+    true
+  }
+
+  override def shouldRetainRecord(record: Record): Boolean = {
+    if (!isOffsetStrategy) {
+      val foundVersion = getVersion(record.key)
+      val currentVersion = extractVersion(record)
+      // use version if available & different otherwise fallback to offset
+      if (foundVersion != currentVersion)
+        return currentVersion >= foundVersion

Review comment:
       I thought that if the record is the last of the log, we should also 
retain it even if it's version is smaller?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -867,7 +866,9 @@ private[log] class Cleaner(val id: Int,
                                   end: Long,
                                   map: OffsetMap,
                                   stats: CleanerStats): Unit = {
-    map.clear()
+    // initialize the map for the topic partition
+    map.init(log.config.compactionStrategy, 
log.config.compactionStrategyHeaderKey, this.id, log.topicPartition.toString)

Review comment:
       nit: could we just construct the log indent at the caller and pass in as 
one parameter?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -395,6 +398,8 @@ object KafkaConfig {
   val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
   val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
   val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
+  val LogCleanerCompactionStrategyProp = "log.cleaner.compaction.strategy"

Review comment:
       Similarly we can define this value as "log.cleaner." + 
TopicConfig.COMPACTION_STRATEGY_CONFIG. 

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
    * The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
    */
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize
+
   /**
    * The maximum number of entries this map can contain
    */
-  val slots: Int = memory / bytesPerEntry
+  var slots: Int = memory / bytesPerEntry
+
+  /* compact strategy */
+  private var compactionStrategy: CompactionStrategy = null
+
+  /* header key for the Strategy header to look for */
+  private var headerKey: String = ""
+
+  /**
+   * Initialize the map with the topic compact strategy
+   * @param strategy The compaction strategy
+   * @param headerKey The header key if the compaction strategy is set to 
header
+   * @param cleanerThreadId The cleaner thread id
+   * @param topicPartitionName The topic partition name
+   */
+  override def init(strategy: String = Defaults.CompactionStrategyOffset, 
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String = 
"") {
+    // set the log indent for the topic partition
+    this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+
+    // Change the salt used for key hashing making all existing keys 
unfindable.
+    this.entries = 0
+    this.lookups = 0L
+    this.probes = 0L
+    this.lastOffset = -1L
+    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + 
bytes.limit(), 0.toByte)
+
+    this.compactionStrategy = CompactionStrategy.withName(strategy)
+    this.headerKey = headerKey.trim()
+
+    this.bytesPerEntry = hashSize + longByteSize + (if 
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+    this.slots = memory / bytesPerEntry
+
+    info(s"Initialized OffsetMap with compaction strategy 
'${this.compactionStrategy}' based on passed strategy '$strategy' and header 
key '$headerKey'")
+  }
   
   /**
    * Associate this offset to the given key.
-   * @param key The key
-   * @param offset The offset
+   * @param record The record
+   * @return success flag
    */
-  override def put(key: ByteBuffer, offset: Long): Unit = {
+  override def put(record: Record): Boolean = {
     require(entries < slots, "Attempt to add a new entry to a full offset 
map.")
+
+    val key = record.key
+    val offset = record.offset
+    val currVersion = extractVersion(record)
+
+    lastOffset = offset
     lookups += 1
     hashInto(key, hash1)
+
     // probe until we find the first empty slot
     var attempt = 0
     var pos = positionOf(hash1, attempt)  
-    while(!isEmpty(pos)) {
+    while (!isEmpty(pos)) {
       bytes.position(pos)
       bytes.get(hash2)
-      if(Arrays.equals(hash1, hash2)) {
+      if (Arrays.equals(hash1, hash2)) {
+        // we found an existing entry, overwrite it and return (size does not 
change)
+        if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+          // read previous value by skipping offset
+          bytes.position(bytes.position() + longByteSize)
+          val foundVersion = bytes.getLong()
+          if (foundVersion > currVersion) {

Review comment:
       +1.

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
    * The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
    */
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize

Review comment:
       nit: I'd suggest we initialize this and the slots below as some sentinel 
value (e.g. -1) to indicate it is not initialized yet, instead of some 
"meaningful" value, to enforce that they can only be useful after `init` is 
called (again).

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -114,6 +114,9 @@ object Defaults {
   val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
   val LogCleanerMinCompactionLagMs = 0L
   val LogCleanerMaxCompactionLagMs = Long.MaxValue
+  val LogCleanerCompactionStrategyOffset = "offset"

Review comment:
       The possible values are defined in both TopicConfig for per-topic 
overrides as well as where for global configuration. I'd suggest we just define 
it in one place, .e.g defining the possible values in `TopicConfig`, similar to 
the viable values of `cleanup.policy` and then in KafkaConfig here we just 
refer to the values of TopicConfig.

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
    * The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
    */
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize
+
   /**
    * The maximum number of entries this map can contain
    */
-  val slots: Int = memory / bytesPerEntry
+  var slots: Int = memory / bytesPerEntry
+
+  /* compact strategy */
+  private var compactionStrategy: CompactionStrategy = null
+
+  /* header key for the Strategy header to look for */
+  private var headerKey: String = ""
+
+  /**
+   * Initialize the map with the topic compact strategy
+   * @param strategy The compaction strategy
+   * @param headerKey The header key if the compaction strategy is set to 
header
+   * @param cleanerThreadId The cleaner thread id
+   * @param topicPartitionName The topic partition name
+   */
+  override def init(strategy: String = Defaults.CompactionStrategyOffset, 
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String = 
"") {
+    // set the log indent for the topic partition
+    this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+
+    // Change the salt used for key hashing making all existing keys 
unfindable.
+    this.entries = 0
+    this.lookups = 0L
+    this.probes = 0L
+    this.lastOffset = -1L
+    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + 
bytes.limit(), 0.toByte)
+
+    this.compactionStrategy = CompactionStrategy.withName(strategy)
+    this.headerKey = headerKey.trim()
+
+    this.bytesPerEntry = hashSize + longByteSize + (if 
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+    this.slots = memory / bytesPerEntry
+
+    info(s"Initialized OffsetMap with compaction strategy 
'${this.compactionStrategy}' based on passed strategy '$strategy' and header 
key '$headerKey'")
+  }
   
   /**
    * Associate this offset to the given key.
-   * @param key The key
-   * @param offset The offset
+   * @param record The record
+   * @return success flag

Review comment:
       The returned flag seems not used by the caller?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -729,6 +734,15 @@ object KafkaConfig {
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
   val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will 
remain uncompacted in the log. Only applicable for logs that are being 
compacted."
   val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will 
remain ineligible for compaction in the log. Only applicable for logs that are 
being compacted."
+  val LogCleanerCompactionStrategyDoc = "The retention strategy to use when 
compacting the log. " +
+    "Only applicable for logs that are being compacted. " + 
+    "By default the compaction strategy is set to \"offset\" where the latest 
offset for the key is retained. " + 
+    "For \"header\" strategy, the value provided by the producer in the record 
header will be used to determine " +
+    "the latest record for the key. For \"timestamp\" strategy, the record 
tiemstamp will be used to determine the " +
+    "latest record for the key. So setting the strategy to anything other than 
\"offset\" will replace the offset " +
+    "when calculating which records to retain for the value (i.e. provided by 
the producer) matching " + 

Review comment:
       I think we can just reuse the doc string in TopicConfig.

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -172,6 +298,33 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
     lastOffset = offset
   }
 
+  /**
+   * Search for the hash of the key by repeated probing until we find the hash 
we are looking for or we find an empty slot
+   * @param key The key
+   * @return true if key found otherwise false
+   */
+  private def search(key: ByteBuffer): Boolean = {
+    lookups += 1
+    hashInto(key, hash1)
+    var attempt = 0
+    var pos = 0
+    //we need to guard against attempt integer overflow if the map is full

Review comment:
       nit: add space after `//`

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -172,6 +298,33 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
     lastOffset = offset
   }
 
+  /**
+   * Search for the hash of the key by repeated probing until we find the hash 
we are looking for or we find an empty slot
+   * @param key The key
+   * @return true if key found otherwise false
+   */
+  private def search(key: ByteBuffer): Boolean = {

Review comment:
       Also add in java doc that when the search returns true, the position of 
the bytebuffer should be pointing to the starting position of the offset for 
that entry.

##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
    * The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
    */
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize
+
   /**
    * The maximum number of entries this map can contain
    */
-  val slots: Int = memory / bytesPerEntry
+  var slots: Int = memory / bytesPerEntry
+
+  /* compact strategy */
+  private var compactionStrategy: CompactionStrategy = null
+
+  /* header key for the Strategy header to look for */
+  private var headerKey: String = ""
+
+  /**
+   * Initialize the map with the topic compact strategy
+   * @param strategy The compaction strategy
+   * @param headerKey The header key if the compaction strategy is set to 
header
+   * @param cleanerThreadId The clenaer thread id
+   * @param topicPartitionName The topic partition name
+   */
+  override def init(strategy: String = Defaults.CompactionStrategy, headerKey: 
String = "", cleanerThreadId: Int = -1, topicPartitionName: String = "") {
+    // set the log indent for the topic partition
+    this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+    info(s"Initializing OffsetMap with compaction strategy '$strategy' and 
header key '$headerKey'")
+
+    // Change the salt used for key hashing making all existing keys 
unfindable.
+    this.entries = 0
+    this.lookups = 0L
+    this.probes = 0L
+    this.lastOffset = -1L
+    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + 
bytes.limit(), 0.toByte)
+
+    this.compactionStrategy = CompactionStrategy.withName(strategy)
+    this.headerKey = headerKey.trim()
+
+    info(s"Compaction strategy set to '${this.compactionStrategy}'")
+    this.bytesPerEntry = hashSize + longByteSize + (if 
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+    this.slots = memory / bytesPerEntry
+  }
   
   /**
    * Associate this offset to the given key.
-   * @param key The key
-   * @param offset The offset
+   * @param record The record
+   * @return success flag
    */
-  override def put(key: ByteBuffer, offset: Long): Unit = {
+  override def put(record: Record): Boolean = {
     require(entries < slots, "Attempt to add a new entry to a full offset 
map.")
+
+    val key = record.key
+    val offset = record.offset
+    val currVersion = extractVersion(record)
+
+    lastOffset = offset
     lookups += 1
     hashInto(key, hash1)
+
     // probe until we find the first empty slot
     var attempt = 0
     var pos = positionOf(hash1, attempt)  
-    while(!isEmpty(pos)) {
+    while (!isEmpty(pos)) {
       bytes.position(pos)
       bytes.get(hash2)
-      if(Arrays.equals(hash1, hash2)) {
+      if (Arrays.equals(hash1, hash2)) {
+        // we found an existing entry, overwrite it and return (size does not 
change)
+        if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+          // read previous value by skipping offset
+          bytes.position(bytes.position() + longByteSize)
+          val foundVersion = bytes.getLong()
+          if (foundVersion > currVersion) {
+            // map already holding latest record
+            return false
+          }
+
+          // reset the position to start of offset
+          bytes.position(bytes.position() - (2*longByteSize))
+        }
+
         // we found an existing entry, overwrite it and return (size does not 
change)
         bytes.putLong(offset)
-        lastOffset = offset
-        return
+        if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+          bytes.putLong(currVersion)
+        }
+
+        return true
       }
+
       attempt += 1
       pos = positionOf(hash1, attempt)
     }
+
     // found an empty slot, update it--size grows by 1
     bytes.position(pos)
     bytes.put(hash1)
     bytes.putLong(offset)
-    lastOffset = offset
+    if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+      bytes.putLong(currVersion)
+    }
+
     entries += 1
+    true
   }
-  
+
   /**
    * Check that there is no entry at the given position
    */
   private def isEmpty(position: Int): Boolean = 
-    bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && 
bytes.getLong(position + 16) == 0
+    bytes.getLong(position) == 0 && bytes.getLong(position + longByteSize) == 
0 && bytes.getLong(position + (2*longByteSize)) == 0
 
   /**
-   * Get the offset associated with this key.
+   * Checks to see whether to retain the record or not
+   * @param record The record
+   * @return true to retain; false not to
+   */
+  override def shouldRetainRecord(record: Record): Boolean = {
+    if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+      val foundVersion = getVersion(record.key)
+      val currentVersion = extractVersion(record)
+      // use version if available & different otherwise fallback to offset

Review comment:
       Also there's one optimization we can do: after we've called 
`getVersion(record.key)`, we can actually avoid a second search if we have to 
call `getOffset(..)` below in 247 since we know the offset can be read together 
with the version: maybe we can let `getVersion()` to return the combo of 
"offset, version", for example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to