dsmiley commented on code in PR #2548: URL: https://github.com/apache/solr/pull/2548#discussion_r1663245696
########## solr/core/src/java/org/apache/solr/update/UpdateLocks.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.util.BytesRef; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.util.IOFunction; + +/** + * Locks associated with updates in connection with the {@link UpdateLog}. + * + * @lucene.internal + */ +public class UpdateLocks { + // names are legacy oriented; TODO rename and use EnvUtils + private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = Review Comment: This was un-publicized so I should simply rename and use EnvUtils. "solr.updateLog.docLockTimeoutMs" perhaps ########## solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java: ########## @@ -168,32 +157,8 @@ private int runCommands( SolrQueryRequest req, Function<DistributedUpdateProcessor, Boolean> function) throws IOException { - try (DistributedUpdateProcessor processor = Review Comment: This test required changes based on the underlying details, but the test is still doing its job. Removed mocks. ########## solr/core/src/java/org/apache/solr/update/UpdateLog.java: ########## @@ -381,19 +380,20 @@ public void init(PluginInfo info) { numRecordsToKeep = objToInt(info.initArgs.get("numRecordsToKeep"), 100); maxNumLogsToKeep = objToInt(info.initArgs.get("maxNumLogsToKeep"), 10); - numVersionBuckets = objToInt(info.initArgs.get("numVersionBuckets"), 65536); - if (numVersionBuckets <= 0) - throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, - "Number of version buckets must be greater than 0!"); + if (info.initArgs.get("numVersionBuckets") != null) { + log.warn("numVersionBuckets is obsolete"); + } + + int timeoutMs = + objToInt(info.initArgs.get("versionBucketLockTimeoutMs"), UpdateLocks.DEFAULT_TIMEOUT); Review Comment: should rename this, maybe "docLockTimeoutMs" with a warning for the old name ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -335,209 +330,199 @@ protected boolean versionAdd(AddUpdateCommand cmd) throws IOException { leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, versionOnUpdate); boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; - VersionBucket bucket = vinfo.bucket(bucketHash); - long dependentVersionFound = -1; // if this is an in-place update, check and wait if we should be waiting for a previous update // (on which this update depends), before entering the synchronized block if (!leaderLogic && cmd.isInPlaceUpdate()) { - dependentVersionFound = - waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket); + dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync); if (dependentVersionFound == -1) { // it means the document has been deleted by now at the leader. drop this update return true; } } - vinfo.lockForUpdate(); Review Comment: moved these to a detail of UpdateLocks. runWithLock ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -652,35 +631,35 @@ private long doWaitForDependentUpdates( AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, - VersionBucket bucket, + Condition condition, TimeOut waitTimeout) { - long lastFoundVersion; - try { - Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); - lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion; + Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); + long lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion; - if (Math.abs(lastFoundVersion) < cmd.prevVersion) { - if (log.isDebugEnabled()) { - log.debug( - "Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}", - (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), - cmd.prevVersion, - lastFoundVersion, - isReplayOrPeersync, - cmd.getPrintableId()); - } + if (Math.abs(lastFoundVersion) < cmd.prevVersion) { + if (log.isDebugEnabled()) { + log.debug( + "Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}", + (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), + cmd.prevVersion, + lastFoundVersion, + isReplayOrPeersync, + cmd.getPrintableId()); } + } - while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) { - long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS); - if (timeLeftInNanos > 0) { // 0 means: wait forever until notified, but we don't want that. - bucket.awaitNanos(timeLeftInNanos); + while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) { + long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS); + if (timeLeftInNanos > 0) { // 0 means: wait forever until notified, but we don't want that. + try { + condition.await(timeLeftInNanos, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } - lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId()); - lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion; } - } finally { - bucket.unlock(); Review Comment: again; moved this elsewhere for better lock balancing; allowed removing an indentation level ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -1089,94 +1057,89 @@ private boolean doVersionDelete( long signedVersionOnUpdate, boolean isReplayOrPeersync, boolean leaderLogic, - boolean forwardedFromCollection, - VersionBucket bucket) + boolean forwardedFromCollection) Review Comment: again, was able to balance locks and remove an indentation level. Not clear in GitHub. ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -335,209 +330,199 @@ protected boolean versionAdd(AddUpdateCommand cmd) throws IOException { leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, versionOnUpdate); boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; - VersionBucket bucket = vinfo.bucket(bucketHash); - long dependentVersionFound = -1; // if this is an in-place update, check and wait if we should be waiting for a previous update // (on which this update depends), before entering the synchronized block if (!leaderLogic && cmd.isInPlaceUpdate()) { - dependentVersionFound = - waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket); + dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync); if (dependentVersionFound == -1) { // it means the document has been deleted by now at the leader. drop this update return true; } } - vinfo.lockForUpdate(); - try { - long finalVersionOnUpdate = versionOnUpdate; - return bucket.runWithLock( - vinfo.getVersionBucketLockTimeoutMs(), - () -> - doVersionAdd( + final long finalVersionOnUpdate = versionOnUpdate; + return getUpdateLocks() + .runWithLock( + cmd.getIndexedId(), + (Condition condition) -> { + // just in case anyone is waiting let them know that we have a new update + // we obtain the version when synchronized and then do the add so we can ensure that + // if version1 < version2 then version1 is actually added before version2. + + // even if we don't store the version field, synchronizing + // will enable us to know what version happened first, and thus enable + // realtime-get to work reliably. + // TODO: if versions aren't stored, do we need to set on the cmd anyway for some + // reason? + // there may be other reasons in the future for a version on the commands + condition.signalAll(); + + return doVersionAdd( cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, - forwardedFromCollection, - bucket)); - } finally { - vinfo.unlockForUpdate(); - } + forwardedFromCollection); + }); } private boolean doVersionAdd( AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, boolean leaderLogic, - boolean forwardedFromCollection, - VersionBucket bucket) + boolean forwardedFromCollection) throws IOException { - try { - BytesRef idBytes = cmd.getIndexedId(); - bucket.signalAll(); - // just in case anyone is waiting let them know that we have a new update - // we obtain the version when synchronized and then do the add so we can ensure that - // if version1 < version2 then version1 is actually added before version2. - - // even if we don't store the version field, synchronizing on the bucket - // will enable us to know what version happened first, and thus enable - // realtime-get to work reliably. - // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason? - // there may be other reasons in the future for a version on the commands - - if (versionsStored) { - - if (leaderLogic) { - - if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { - // forwarded from a collection but we are not buffering so strip original version and - // apply our own - // see SOLR-5308 - if (log.isInfoEnabled()) { - log.info("Removing version field from doc: {}", cmd.getPrintableId()); - } - cmd.solrDoc.remove(CommonParams.VERSION_FIELD); - versionOnUpdate = 0; - } + BytesRef idBytes = cmd.getIndexedId(); - getUpdatedDocument(cmd, versionOnUpdate); + if (versionsStored) { - // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 - if (forwardedFromCollection - && ulog.getState() != UpdateLog.State.ACTIVE - && isReplayOrPeersync == false) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - if (log.isInfoEnabled()) { - log.info( - "Leader logic applied but update log is buffering: {}", cmd.getPrintableId()); - } - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.add(cmd); - return true; + if (leaderLogic) { + + if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { + // forwarded from a collection but we are not buffering so strip original version and + // apply our own + // see SOLR-5308 + if (log.isInfoEnabled()) { + log.info("Removing version field from doc: {}", cmd.getPrintableId()); } + cmd.solrDoc.remove(CommonParams.VERSION_FIELD); + versionOnUpdate = 0; + } - if (versionOnUpdate != 0) { - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - long foundVersion = lastVersion == null ? -1 : lastVersion; - if (versionOnUpdate == foundVersion - || (versionOnUpdate < 0 && foundVersion < 0) - || (versionOnUpdate == 1 && foundVersion > 0)) { - // we're ok if versions match, or if both are negative (all missing docs are equal), - // or if cmd specified it must exist (versionOnUpdate==1) and it does. - } else { - if (cmd.getReq().getParams().getBool(CommonParams.FAIL_ON_VERSION_CONFLICTS, true) - == false) { - return true; - } + getUpdatedDocument(cmd, versionOnUpdate); - throw new SolrException( - ErrorCode.CONFLICT, - "version conflict for " - + cmd.getPrintableId() - + " expected=" - + versionOnUpdate - + " actual=" - + foundVersion); - } + // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 + if (forwardedFromCollection + && ulog.getState() != UpdateLog.State.ACTIVE + && isReplayOrPeersync == false) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + if (log.isInfoEnabled()) { + log.info("Leader logic applied but update log is buffering: {}", cmd.getPrintableId()); } + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.add(cmd); + return true; + } - long version = vinfo.getNewClock(); - cmd.setVersion(version); - cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version); - } else { - // The leader forwarded us this update. - cmd.setVersion(versionOnUpdate); + if (versionOnUpdate != 0) { + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + long foundVersion = lastVersion == null ? -1 : lastVersion; + if (versionOnUpdate == foundVersion + || (versionOnUpdate < 0 && foundVersion < 0) + || (versionOnUpdate == 1 && foundVersion > 0)) { + // we're ok if versions match, or if both are negative (all missing docs are equal), + // or if cmd specified it must exist (versionOnUpdate==1) and it does. + } else { + if (cmd.getReq().getParams().getBool(CommonParams.FAIL_ON_VERSION_CONFLICTS, true) + == false) { + return true; + } - if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.add(cmd); - return true; + throw new SolrException( + ErrorCode.CONFLICT, + "version conflict for " + + cmd.getPrintableId() + + " expected=" + + versionOnUpdate + + " actual=" + + foundVersion); } + } - if (cmd.isInPlaceUpdate()) { - long prev = cmd.prevVersion; - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - if (lastVersion == null || Math.abs(lastVersion) < prev) { - // this was checked for (in waitForDependentUpdates()) before entering the - // synchronized block. So we shouldn't be here, unless what must've happened is: by - // the time synchronization block was entered, the prev update was deleted by DBQ. - // Since now that update is not in index, the vinfo.lookupVersion() is possibly giving - // us a version from the deleted list (which might be older than the prev update!) - UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate); - - if (fetchedFromLeader instanceof DeleteUpdateCommand) { - if (log.isInfoEnabled()) { - log.info( - "In-place update of {} failed to find valid lastVersion to apply to, and the document was deleted at the leader subsequently.", - idBytes.utf8ToString()); - } - versionDelete((DeleteUpdateCommand) fetchedFromLeader); - return true; - } else { - assert fetchedFromLeader instanceof AddUpdateCommand; - // Newer document was fetched from the leader. Apply that document instead of this - // current in-place update. - if (log.isInfoEnabled()) { - log.info( - "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}", - idBytes.utf8ToString(), - fetchedFromLeader); - } - // Make this update to become a non-inplace update containing the full document - // obtained from the leader - cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc; - cmd.prevVersion = -1; - cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD)); - assert cmd.isInPlaceUpdate() == false; + long version = vinfo.getNewClock(); + cmd.setVersion(version); + cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version); + } else { + // The leader forwarded us this update. + cmd.setVersion(versionOnUpdate); + + if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.add(cmd); + return true; + } + + if (cmd.isInPlaceUpdate()) { + long prev = cmd.prevVersion; + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + if (lastVersion == null || Math.abs(lastVersion) < prev) { + // this was checked for (in waitForDependentUpdates()) before entering the + // synchronized block. So we shouldn't be here, unless what must've happened is: by + // the time synchronization block was entered, the prev update was deleted by DBQ. + // Since now that update is not in index, the vinfo.lookupVersion() is possibly giving + // us a version from the deleted list (which might be older than the prev update!) + UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate); + + if (fetchedFromLeader instanceof DeleteUpdateCommand) { + if (log.isInfoEnabled()) { + log.info( + "In-place update of {} failed to find valid lastVersion to apply to, and the document was deleted at the leader subsequently.", + idBytes.utf8ToString()); } + versionDelete((DeleteUpdateCommand) fetchedFromLeader); + return true; } else { - if (Math.abs(lastVersion) > prev) { - // this means we got a newer full doc update and in that case it makes no sense to - // apply the older inplace update. Drop this update + assert fetchedFromLeader instanceof AddUpdateCommand; + // Newer document was fetched from the leader. Apply that document instead of this + // current in-place update. + if (log.isInfoEnabled()) { log.info( - "Update was applied on version: {}, but last version I have is: {}. Dropping current update", - prev, - lastVersion); - return true; - } else { - // We're good, we should apply this update. + "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}", + idBytes.utf8ToString(), + fetchedFromLeader); } + // Make this update to become a non-inplace update containing the full document + // obtained from the leader + cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc; + cmd.prevVersion = -1; + cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD)); + assert cmd.isInPlaceUpdate() == false; } } else { - // if we aren't the leader, then we need to check that updates were not re-ordered - // we need to check the specific version for this id. - Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); - if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { - // This update is a repeat, or was reordered. We need to drop this update. - if (log.isDebugEnabled()) { - log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); - } + if (Math.abs(lastVersion) > prev) { + // this means we got a newer full doc update and in that case it makes no sense to + // apply the older inplace update. Drop this update + log.info( + "Update was applied on version: {}, but last version I have is: {}. Dropping current update", + prev, + lastVersion); return true; + } else { + // We're good, we should apply this update. } } - if (!isSubShardLeader - && replicaType == Replica.Type.TLOG - && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { - cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } else { + // if we aren't the leader, then we need to check that updates were not re-ordered + // we need to check the specific version for this id. + Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); + if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { + // This update is a repeat, or was reordered. We need to drop this update. + if (log.isDebugEnabled()) { + log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); + } + return true; } } + if (!isSubShardLeader + && replicaType == Replica.Type.TLOG + && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } } + } - SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy() : null; + SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy() : null; - // TODO: possibly set checkDeleteByQueries as a flag on the command? - doLocalAdd(cmd); + // TODO: possibly set checkDeleteByQueries as a flag on the command? + doLocalAdd(cmd); - if (clonedDoc != null) { - cmd.solrDoc = clonedDoc; - } - } finally { - bucket.unlock(); Review Comment: This is what's gone from this method (moved to caller) and accounted for indentation level decrease. ########## solr/core/src/java/org/apache/solr/update/VersionInfo.java: ########## @@ -33,18 +30,16 @@ import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.util.RefCounted; +/** + * Related to the {@code _version_} field, in connection with the {@link UpdateLog}. + * + * @lucene.internal + */ public class VersionInfo { - private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = Review Comment: as you can see; removed lock stuff from here; it's now in UpdateLocks ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -202,6 +202,10 @@ public DistributedUpdateProcessor( // this.rsp = reqInfo != null ? reqInfo.getRsp() : null; } + protected UpdateLocks getUpdateLocks() { Review Comment: for the test ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -335,209 +330,199 @@ protected boolean versionAdd(AddUpdateCommand cmd) throws IOException { leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, versionOnUpdate); boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; - VersionBucket bucket = vinfo.bucket(bucketHash); - long dependentVersionFound = -1; // if this is an in-place update, check and wait if we should be waiting for a previous update // (on which this update depends), before entering the synchronized block if (!leaderLogic && cmd.isInPlaceUpdate()) { - dependentVersionFound = - waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket); + dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync); if (dependentVersionFound == -1) { // it means the document has been deleted by now at the leader. drop this update return true; } } - vinfo.lockForUpdate(); - try { - long finalVersionOnUpdate = versionOnUpdate; - return bucket.runWithLock( - vinfo.getVersionBucketLockTimeoutMs(), - () -> - doVersionAdd( + final long finalVersionOnUpdate = versionOnUpdate; + return getUpdateLocks() + .runWithLock( + cmd.getIndexedId(), + (Condition condition) -> { Review Comment: Even though VersionBucket is gone, the details inside UpdateLocks are not exposed; we merely pass a Condition as a param (a JDK class). The rest of the code here is just moved around for balancing. ########## solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java: ########## @@ -335,209 +330,199 @@ protected boolean versionAdd(AddUpdateCommand cmd) throws IOException { leaderLogicWithVersionIntegrityCheck(isReplayOrPeersync, isLeader, versionOnUpdate); boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; - VersionBucket bucket = vinfo.bucket(bucketHash); - long dependentVersionFound = -1; // if this is an in-place update, check and wait if we should be waiting for a previous update // (on which this update depends), before entering the synchronized block if (!leaderLogic && cmd.isInPlaceUpdate()) { - dependentVersionFound = - waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket); + dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync); if (dependentVersionFound == -1) { // it means the document has been deleted by now at the leader. drop this update return true; } } - vinfo.lockForUpdate(); - try { - long finalVersionOnUpdate = versionOnUpdate; - return bucket.runWithLock( - vinfo.getVersionBucketLockTimeoutMs(), - () -> - doVersionAdd( + final long finalVersionOnUpdate = versionOnUpdate; + return getUpdateLocks() + .runWithLock( + cmd.getIndexedId(), + (Condition condition) -> { + // just in case anyone is waiting let them know that we have a new update + // we obtain the version when synchronized and then do the add so we can ensure that + // if version1 < version2 then version1 is actually added before version2. + + // even if we don't store the version field, synchronizing + // will enable us to know what version happened first, and thus enable + // realtime-get to work reliably. + // TODO: if versions aren't stored, do we need to set on the cmd anyway for some + // reason? + // there may be other reasons in the future for a version on the commands + condition.signalAll(); + + return doVersionAdd( cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, - forwardedFromCollection, - bucket)); - } finally { - vinfo.unlockForUpdate(); - } + forwardedFromCollection); + }); } private boolean doVersionAdd( AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, boolean leaderLogic, - boolean forwardedFromCollection, - VersionBucket bucket) Review Comment: no longer need to pass bucket or even Condition; letting the caller deal with that matter. This makes doVersionAdd easier to reason about and one less indentation level. In GitHub this isn't obvious; maybe IntelliJ will show it as just indentation changing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
