guozhangwang commented on a change in pull request #8103:
URL: https://github.com/apache/kafka/pull/8103#discussion_r415156119
##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -109,6 +109,19 @@
public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a
message will remain " +
"ineligible for compaction in the log. Only applicable for logs that
are being compacted.";
+ public static final String COMPACTION_STRATEGY_CONFIG =
"compaction.strategy";
+ public static final String COMPACTION_STRATEGY_DOC = "The retention
strategy to use when compacting the log. " +
+ "Only applicable for logs that are being compacted. " +
+ "By default the compaction strategy is set to \"offset\" where the
latest offset for the key is retained. " +
+ "For \"header\" strategy, the value provided by the producer in the
record header will be used to determine " +
+ "the latest record for the key. For \"timestamp\" strategy, the record
tiemstamp will be used to determine the " +
+ "latest record for the key. So setting the strategy to anything other
than \"offset\" will replace the offset " +
+ "when calculating which records to retain for the value (i.e. provided
by the producer) matching " +
+ "the given strategy name (case-insensitive). The valid strategies are
\"offset\", \"timestamp\" and \"header\".";
+
+ public static final String COMPACTION_STRATEGY_HEADER_KEY_CONFIG =
"compaction.strategy.header";
+ public static final String COMPACTION_STRATEGY_HEADER_KEY_DOC = "The
header key for the compaction. Only applicable for compaction strategy header.";
Review comment:
nit: ... for "header" compaction strategy.
##########
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##########
@@ -109,6 +109,19 @@
public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a
message will remain " +
"ineligible for compaction in the log. Only applicable for logs that
are being compacted.";
+ public static final String COMPACTION_STRATEGY_CONFIG =
"compaction.strategy";
+ public static final String COMPACTION_STRATEGY_DOC = "The retention
strategy to use when compacting the log. " +
+ "Only applicable for logs that are being compacted. " +
+ "By default the compaction strategy is set to \"offset\" where the
latest offset for the key is retained. " +
+ "For \"header\" strategy, the value provided by the producer in the
record header will be used to determine " +
+ "the latest record for the key. For \"timestamp\" strategy, the record
tiemstamp will be used to determine the " +
+ "latest record for the key. So setting the strategy to anything other
than \"offset\" will replace the offset " +
Review comment:
git: The sentence "So setting the strategy ... matching the given
strategy name" reads a bit confusing here. I think we only need to state that
when the change of the policy would take effects (the next time when compaction
is triggered by the log cleaner thread), and emphasize that for "timestamp" and
"header" we would always still retain the last record.
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
/**
* The number of bytes of space each entry uses (the number of bytes in the
hash plus an 8 byte offset)
+ * This evaluates to the number of bytes in the hash plus 8 bytes for the
offset
+ * and, if applicable, another 8 bytes for non-offset compact strategy (set
in the init method).
*/
- val bytesPerEntry = hashSize + 8
-
+ var bytesPerEntry = hashSize + longByteSize
+
/**
* The maximum number of entries this map can contain
*/
- val slots: Int = memory / bytesPerEntry
+ var slots: Int = memory / bytesPerEntry
+
+ /* compact strategy */
+ private var compactionStrategy: CompactionStrategy = null
+
+ /* header key for the Strategy header to look for */
+ private var headerKey: String = ""
+
+ /**
+ * Initialize the map with the topic compact strategy
+ * @param strategy The compaction strategy
+ * @param headerKey The header key if the compaction strategy is set to
header
+ * @param cleanerThreadId The clenaer thread id
+ * @param topicPartitionName The topic partition name
+ */
+ override def init(strategy: String = Defaults.CompactionStrategy, headerKey:
String = "", cleanerThreadId: Int = -1, topicPartitionName: String = "") {
+ // set the log indent for the topic partition
+ this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+ info(s"Initializing OffsetMap with compaction strategy '$strategy' and
header key '$headerKey'")
+
+ // Change the salt used for key hashing making all existing keys
unfindable.
+ this.entries = 0
+ this.lookups = 0L
+ this.probes = 0L
+ this.lastOffset = -1L
+ Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset +
bytes.limit(), 0.toByte)
+
+ this.compactionStrategy = CompactionStrategy.withName(strategy)
+ this.headerKey = headerKey.trim()
+
+ info(s"Compaction strategy set to '${this.compactionStrategy}'")
+ this.bytesPerEntry = hashSize + longByteSize + (if
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+ this.slots = memory / bytesPerEntry
+ }
/**
* Associate this offset to the given key.
- * @param key The key
- * @param offset The offset
+ * @param record The record
+ * @return success flag
*/
- override def put(key: ByteBuffer, offset: Long): Unit = {
+ override def put(record: Record): Boolean = {
require(entries < slots, "Attempt to add a new entry to a full offset
map.")
+
+ val key = record.key
+ val offset = record.offset
+ val currVersion = extractVersion(record)
+
+ lastOffset = offset
lookups += 1
hashInto(key, hash1)
+
// probe until we find the first empty slot
var attempt = 0
var pos = positionOf(hash1, attempt)
- while(!isEmpty(pos)) {
+ while (!isEmpty(pos)) {
bytes.position(pos)
bytes.get(hash2)
- if(Arrays.equals(hash1, hash2)) {
+ if (Arrays.equals(hash1, hash2)) {
+ // we found an existing entry, overwrite it and return (size does not
change)
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ // read previous value by skipping offset
+ bytes.position(bytes.position() + longByteSize)
+ val foundVersion = bytes.getLong()
+ if (foundVersion > currVersion) {
+ // map already holding latest record
+ return false
+ }
+
+ // reset the position to start of offset
+ bytes.position(bytes.position() - (2*longByteSize))
+ }
+
// we found an existing entry, overwrite it and return (size does not
change)
bytes.putLong(offset)
- lastOffset = offset
- return
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ bytes.putLong(currVersion)
+ }
+
+ return true
}
+
attempt += 1
pos = positionOf(hash1, attempt)
}
+
// found an empty slot, update it--size grows by 1
bytes.position(pos)
bytes.put(hash1)
bytes.putLong(offset)
- lastOffset = offset
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ bytes.putLong(currVersion)
+ }
+
entries += 1
+ true
}
-
+
/**
* Check that there is no entry at the given position
*/
private def isEmpty(position: Int): Boolean =
- bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 &&
bytes.getLong(position + 16) == 0
+ bytes.getLong(position) == 0 && bytes.getLong(position + longByteSize) ==
0 && bytes.getLong(position + (2*longByteSize)) == 0
/**
- * Get the offset associated with this key.
+ * Checks to see whether to retain the record or not
+ * @param record The record
+ * @return true to retain; false not to
+ */
+ override def shouldRetainRecord(record: Record): Boolean = {
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ val foundVersion = getVersion(record.key)
+ val currentVersion = extractVersion(record)
+ // use version if available & different otherwise fallback to offset
Review comment:
+1. I think the LEO case should be captured in this function as well.
##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -1725,26 +2360,64 @@ class LogCleanerTest {
}
class FakeOffsetMap(val slots: Int) extends OffsetMap {
- val map = new java.util.HashMap[String, Long]()
+ val map = new java.util.HashMap[String, Record]()
var lastOffset = -1L
+ var isOffsetStrategy = true
+ var isTimestampStrategy = false
+ var headerKey = ""
private def keyFor(key: ByteBuffer) =
new String(Utils.readBytes(key.duplicate), StandardCharsets.UTF_8)
- override def put(key: ByteBuffer, offset: Long): Unit = {
- lastOffset = offset
- map.put(keyFor(key), offset)
+ override def init(strategy: String = Defaults.CompactionStrategyOffset,
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String =
"") = {
+ this.map.clear()
+
+ this.isOffsetStrategy =
Defaults.CompactionStrategyOffset.equalsIgnoreCase(strategy)
+ this.isTimestampStrategy =
Defaults.CompactionStrategyTimestamp.equalsIgnoreCase(strategy)
+ this.headerKey = headerKey
}
- override def get(key: ByteBuffer): Long = {
+ override def put(record: Record): Boolean = {
+ lastOffset = record.offset
+ extractVersion(record)
+ val key = keyFor(record.key)
+ if (!isOffsetStrategy && map.containsKey(key)) {
+ val foundVersion = extractVersion(map.get(key))
+ val currVersion = extractVersion(record)
+ if (foundVersion > currVersion)
+ return false
+ }
+ map.put(key, record)
+ true
+ }
+
+ override def shouldRetainRecord(record: Record): Boolean = {
+ if (!isOffsetStrategy) {
+ val foundVersion = getVersion(record.key)
+ val currentVersion = extractVersion(record)
+ // use version if available & different otherwise fallback to offset
+ if (foundVersion != currentVersion)
+ return currentVersion >= foundVersion
Review comment:
I thought that if the record is the last of the log, we should also
retain it even if it's version is smaller?
##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -867,7 +866,9 @@ private[log] class Cleaner(val id: Int,
end: Long,
map: OffsetMap,
stats: CleanerStats): Unit = {
- map.clear()
+ // initialize the map for the topic partition
+ map.init(log.config.compactionStrategy,
log.config.compactionStrategyHeaderKey, this.id, log.topicPartition.toString)
Review comment:
nit: could we just construct the log indent at the caller and pass in as
one parameter?
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -395,6 +398,8 @@ object KafkaConfig {
val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
+ val LogCleanerCompactionStrategyProp = "log.cleaner.compaction.strategy"
Review comment:
Similarly we can define this value as "log.cleaner." +
TopicConfig.COMPACTION_STRATEGY_CONFIG.
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
/**
* The number of bytes of space each entry uses (the number of bytes in the
hash plus an 8 byte offset)
+ * This evaluates to the number of bytes in the hash plus 8 bytes for the
offset
+ * and, if applicable, another 8 bytes for non-offset compact strategy (set
in the init method).
*/
- val bytesPerEntry = hashSize + 8
-
+ var bytesPerEntry = hashSize + longByteSize
+
/**
* The maximum number of entries this map can contain
*/
- val slots: Int = memory / bytesPerEntry
+ var slots: Int = memory / bytesPerEntry
+
+ /* compact strategy */
+ private var compactionStrategy: CompactionStrategy = null
+
+ /* header key for the Strategy header to look for */
+ private var headerKey: String = ""
+
+ /**
+ * Initialize the map with the topic compact strategy
+ * @param strategy The compaction strategy
+ * @param headerKey The header key if the compaction strategy is set to
header
+ * @param cleanerThreadId The cleaner thread id
+ * @param topicPartitionName The topic partition name
+ */
+ override def init(strategy: String = Defaults.CompactionStrategyOffset,
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String =
"") {
+ // set the log indent for the topic partition
+ this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+
+ // Change the salt used for key hashing making all existing keys
unfindable.
+ this.entries = 0
+ this.lookups = 0L
+ this.probes = 0L
+ this.lastOffset = -1L
+ Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset +
bytes.limit(), 0.toByte)
+
+ this.compactionStrategy = CompactionStrategy.withName(strategy)
+ this.headerKey = headerKey.trim()
+
+ this.bytesPerEntry = hashSize + longByteSize + (if
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+ this.slots = memory / bytesPerEntry
+
+ info(s"Initialized OffsetMap with compaction strategy
'${this.compactionStrategy}' based on passed strategy '$strategy' and header
key '$headerKey'")
+ }
/**
* Associate this offset to the given key.
- * @param key The key
- * @param offset The offset
+ * @param record The record
+ * @return success flag
*/
- override def put(key: ByteBuffer, offset: Long): Unit = {
+ override def put(record: Record): Boolean = {
require(entries < slots, "Attempt to add a new entry to a full offset
map.")
+
+ val key = record.key
+ val offset = record.offset
+ val currVersion = extractVersion(record)
+
+ lastOffset = offset
lookups += 1
hashInto(key, hash1)
+
// probe until we find the first empty slot
var attempt = 0
var pos = positionOf(hash1, attempt)
- while(!isEmpty(pos)) {
+ while (!isEmpty(pos)) {
bytes.position(pos)
bytes.get(hash2)
- if(Arrays.equals(hash1, hash2)) {
+ if (Arrays.equals(hash1, hash2)) {
+ // we found an existing entry, overwrite it and return (size does not
change)
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ // read previous value by skipping offset
+ bytes.position(bytes.position() + longByteSize)
+ val foundVersion = bytes.getLong()
+ if (foundVersion > currVersion) {
Review comment:
+1.
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
/**
* The number of bytes of space each entry uses (the number of bytes in the
hash plus an 8 byte offset)
+ * This evaluates to the number of bytes in the hash plus 8 bytes for the
offset
+ * and, if applicable, another 8 bytes for non-offset compact strategy (set
in the init method).
*/
- val bytesPerEntry = hashSize + 8
-
+ var bytesPerEntry = hashSize + longByteSize
Review comment:
nit: I'd suggest we initialize this and the slots below as some sentinel
value (e.g. -1) to indicate it is not initialized yet, instead of some
"meaningful" value, to enforce that they can only be useful after `init` is
called (again).
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -114,6 +114,9 @@ object Defaults {
val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
val LogCleanerMinCompactionLagMs = 0L
val LogCleanerMaxCompactionLagMs = Long.MaxValue
+ val LogCleanerCompactionStrategyOffset = "offset"
Review comment:
The possible values are defined in both TopicConfig for per-topic
overrides as well as where for global configuration. I'd suggest we just define
it in one place, .e.g defining the possible values in `TopicConfig`, similar to
the viable values of `cleanup.policy` and then in KafkaConfig here we just
refer to the values of TopicConfig.
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
/**
* The number of bytes of space each entry uses (the number of bytes in the
hash plus an 8 byte offset)
+ * This evaluates to the number of bytes in the hash plus 8 bytes for the
offset
+ * and, if applicable, another 8 bytes for non-offset compact strategy (set
in the init method).
*/
- val bytesPerEntry = hashSize + 8
-
+ var bytesPerEntry = hashSize + longByteSize
+
/**
* The maximum number of entries this map can contain
*/
- val slots: Int = memory / bytesPerEntry
+ var slots: Int = memory / bytesPerEntry
+
+ /* compact strategy */
+ private var compactionStrategy: CompactionStrategy = null
+
+ /* header key for the Strategy header to look for */
+ private var headerKey: String = ""
+
+ /**
+ * Initialize the map with the topic compact strategy
+ * @param strategy The compaction strategy
+ * @param headerKey The header key if the compaction strategy is set to
header
+ * @param cleanerThreadId The cleaner thread id
+ * @param topicPartitionName The topic partition name
+ */
+ override def init(strategy: String = Defaults.CompactionStrategyOffset,
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String =
"") {
+ // set the log indent for the topic partition
+ this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+
+ // Change the salt used for key hashing making all existing keys
unfindable.
+ this.entries = 0
+ this.lookups = 0L
+ this.probes = 0L
+ this.lastOffset = -1L
+ Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset +
bytes.limit(), 0.toByte)
+
+ this.compactionStrategy = CompactionStrategy.withName(strategy)
+ this.headerKey = headerKey.trim()
+
+ this.bytesPerEntry = hashSize + longByteSize + (if
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+ this.slots = memory / bytesPerEntry
+
+ info(s"Initialized OffsetMap with compaction strategy
'${this.compactionStrategy}' based on passed strategy '$strategy' and header
key '$headerKey'")
+ }
/**
* Associate this offset to the given key.
- * @param key The key
- * @param offset The offset
+ * @param record The record
+ * @return success flag
Review comment:
The returned flag seems not used by the caller?
##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -729,6 +734,15 @@ object KafkaConfig {
val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will
remain uncompacted in the log. Only applicable for logs that are being
compacted."
val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will
remain ineligible for compaction in the log. Only applicable for logs that are
being compacted."
+ val LogCleanerCompactionStrategyDoc = "The retention strategy to use when
compacting the log. " +
+ "Only applicable for logs that are being compacted. " +
+ "By default the compaction strategy is set to \"offset\" where the latest
offset for the key is retained. " +
+ "For \"header\" strategy, the value provided by the producer in the record
header will be used to determine " +
+ "the latest record for the key. For \"timestamp\" strategy, the record
tiemstamp will be used to determine the " +
+ "latest record for the key. So setting the strategy to anything other than
\"offset\" will replace the offset " +
+ "when calculating which records to retain for the value (i.e. provided by
the producer) matching " +
Review comment:
I think we can just reuse the doc string in TopicConfig.
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -172,6 +298,33 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
lastOffset = offset
}
+ /**
+ * Search for the hash of the key by repeated probing until we find the hash
we are looking for or we find an empty slot
+ * @param key The key
+ * @return true if key found otherwise false
+ */
+ private def search(key: ByteBuffer): Boolean = {
+ lookups += 1
+ hashInto(key, hash1)
+ var attempt = 0
+ var pos = 0
+ //we need to guard against attempt integer overflow if the map is full
Review comment:
nit: add space after `//`
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -172,6 +298,33 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
lastOffset = offset
}
+ /**
+ * Search for the hash of the key by repeated probing until we find the hash
we are looking for or we find an empty slot
+ * @param key The key
+ * @return true if key found otherwise false
+ */
+ private def search(key: ByteBuffer): Boolean = {
Review comment:
Also add in java doc that when the search returns true, the position of
the bytebuffer should be pointing to the starting position of the offset for
that entry.
##########
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##########
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm:
String = "MD5") extend
/**
* The number of bytes of space each entry uses (the number of bytes in the
hash plus an 8 byte offset)
+ * This evaluates to the number of bytes in the hash plus 8 bytes for the
offset
+ * and, if applicable, another 8 bytes for non-offset compact strategy (set
in the init method).
*/
- val bytesPerEntry = hashSize + 8
-
+ var bytesPerEntry = hashSize + longByteSize
+
/**
* The maximum number of entries this map can contain
*/
- val slots: Int = memory / bytesPerEntry
+ var slots: Int = memory / bytesPerEntry
+
+ /* compact strategy */
+ private var compactionStrategy: CompactionStrategy = null
+
+ /* header key for the Strategy header to look for */
+ private var headerKey: String = ""
+
+ /**
+ * Initialize the map with the topic compact strategy
+ * @param strategy The compaction strategy
+ * @param headerKey The header key if the compaction strategy is set to
header
+ * @param cleanerThreadId The clenaer thread id
+ * @param topicPartitionName The topic partition name
+ */
+ override def init(strategy: String = Defaults.CompactionStrategy, headerKey:
String = "", cleanerThreadId: Int = -1, topicPartitionName: String = "") {
+ // set the log indent for the topic partition
+ this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+ info(s"Initializing OffsetMap with compaction strategy '$strategy' and
header key '$headerKey'")
+
+ // Change the salt used for key hashing making all existing keys
unfindable.
+ this.entries = 0
+ this.lookups = 0L
+ this.probes = 0L
+ this.lastOffset = -1L
+ Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset +
bytes.limit(), 0.toByte)
+
+ this.compactionStrategy = CompactionStrategy.withName(strategy)
+ this.headerKey = headerKey.trim()
+
+ info(s"Compaction strategy set to '${this.compactionStrategy}'")
+ this.bytesPerEntry = hashSize + longByteSize + (if
(this.compactionStrategy == CompactionStrategy.OFFSET) 0 else longByteSize)
+ this.slots = memory / bytesPerEntry
+ }
/**
* Associate this offset to the given key.
- * @param key The key
- * @param offset The offset
+ * @param record The record
+ * @return success flag
*/
- override def put(key: ByteBuffer, offset: Long): Unit = {
+ override def put(record: Record): Boolean = {
require(entries < slots, "Attempt to add a new entry to a full offset
map.")
+
+ val key = record.key
+ val offset = record.offset
+ val currVersion = extractVersion(record)
+
+ lastOffset = offset
lookups += 1
hashInto(key, hash1)
+
// probe until we find the first empty slot
var attempt = 0
var pos = positionOf(hash1, attempt)
- while(!isEmpty(pos)) {
+ while (!isEmpty(pos)) {
bytes.position(pos)
bytes.get(hash2)
- if(Arrays.equals(hash1, hash2)) {
+ if (Arrays.equals(hash1, hash2)) {
+ // we found an existing entry, overwrite it and return (size does not
change)
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ // read previous value by skipping offset
+ bytes.position(bytes.position() + longByteSize)
+ val foundVersion = bytes.getLong()
+ if (foundVersion > currVersion) {
+ // map already holding latest record
+ return false
+ }
+
+ // reset the position to start of offset
+ bytes.position(bytes.position() - (2*longByteSize))
+ }
+
// we found an existing entry, overwrite it and return (size does not
change)
bytes.putLong(offset)
- lastOffset = offset
- return
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ bytes.putLong(currVersion)
+ }
+
+ return true
}
+
attempt += 1
pos = positionOf(hash1, attempt)
}
+
// found an empty slot, update it--size grows by 1
bytes.position(pos)
bytes.put(hash1)
bytes.putLong(offset)
- lastOffset = offset
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ bytes.putLong(currVersion)
+ }
+
entries += 1
+ true
}
-
+
/**
* Check that there is no entry at the given position
*/
private def isEmpty(position: Int): Boolean =
- bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 &&
bytes.getLong(position + 16) == 0
+ bytes.getLong(position) == 0 && bytes.getLong(position + longByteSize) ==
0 && bytes.getLong(position + (2*longByteSize)) == 0
/**
- * Get the offset associated with this key.
+ * Checks to see whether to retain the record or not
+ * @param record The record
+ * @return true to retain; false not to
+ */
+ override def shouldRetainRecord(record: Record): Boolean = {
+ if (this.compactionStrategy != CompactionStrategy.OFFSET) {
+ val foundVersion = getVersion(record.key)
+ val currentVersion = extractVersion(record)
+ // use version if available & different otherwise fallback to offset
Review comment:
Also there's one optimization we can do: after we've called
`getVersion(record.key)`, we can actually avoid a second search if we have to
call `getOffset(..)` below in 247 since we know the offset can be read together
with the version: maybe we can let `getVersion()` to return the combo of
"offset, version", for example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]