jbampton commented on a change in pull request #2148:
URL: https://github.com/apache/lucene-solr/pull/2148#discussion_r544307457
##########
File path:
solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
##########
@@ -107,7 +111,21 @@ public ZkWriteCommand modifyCollection(final ClusterState
clusterState, ZkNodePr
DocCollection coll =
clusterState.getCollection(message.getStr(COLLECTION_PROP));
Map<String, Object> m = coll.shallowCopy();
boolean hasAnyOps = false;
+ PerReplicaStates.WriteOps replicaOps = null;
for (String prop :
CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
+ if (prop.equals(DocCollection.PER_REPLICA_STATE)) {
+ String val = message.getStr(DocCollection.PER_REPLICA_STATE);
+ if (val == null) continue;
+ boolean enable = Boolean.parseBoolean(val);
+ if (enable == coll.isPerReplicaState()) {
+ //already enabled
+ log.error("trying to set perReplicaState to {} from {}", val,
coll.isPerReplicaState());
+ continue;
+ }
+ replicaOps = PerReplicaStates.WriteOps.modifyCollection(coll, enable,
PerReplicaStates.fetch(coll.getZNode(), zkClient, null));
+ }
+
+
if (message.containsKey(prop)) {
hasAnyOps = true;
if (message.get(prop) == null) {
Review comment:
```suggestion
if (message.get(prop) == null) {
```
##########
File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java
##########
@@ -474,6 +475,7 @@ public boolean isClosed() {
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
+ zkStateReader.nodeName = nodeName;
Review comment:
```suggestion
zkStateReader.nodeName = nodeName;
```
##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -165,7 +198,15 @@ private boolean maybeFlushAfter() {
public boolean hasPendingUpdates() {
return numUpdates != 0 || isClusterStateModified;
}
+ public ClusterState writeUpdate(ZkWriteCommand command) throws
IllegalStateException, KeeperException, InterruptedException {
+ Map<String, ZkWriteCommand> commands = new HashMap<>();
Review comment:
```suggestion
Map<String, ZkWriteCommand> commands = new HashMap<>();
```
##########
File path:
solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
##########
@@ -489,6 +492,7 @@ private Create(String collection, String config, String
routerName, Integer numS
public Create setStateFormat(Integer stateFormat) { this.stateFormat =
stateFormat; return this; }
public Create setRule(String... s){ this.rule = s; return this; }
public Create setSnitch(String... s){ this.snitch = s; return this; }
+ public Create setPerReplicaState(Boolean b) {this.perReplicaState = b;
return this; }
Review comment:
```suggestion
public Create setPerReplicaState(Boolean b) {this.perReplicaState = b;
return this; }
```
##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -113,20 +114,52 @@ public ClusterState enqueueUpdate(ClusterState prevState,
List<ZkWriteCommand> c
if (cmds.isEmpty()) return prevState;
if (isNoOps(cmds)) return prevState;
+ boolean forceFlush = false;
+ if (cmds.size() == 1) {
+ //most messages result in only one command. let's deal with it right away
+ ZkWriteCommand cmd = cmds.get(0);
+ if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+ //we do not wish to batch any updates for collections with per-replica
state because
+ // these changes go to individual ZK nodes and there is zero advantage
to batching
+ //now check if there are any updates for the same collection already
present
+ if (updates.containsKey(cmd.name)) {
+ //this should not happen
+ // but let's get those updates out anyway
+ writeUpdate(updates.remove(cmd.name));
+ }
+ //now let's write the current message
+ try {
+ return writeUpdate(cmd);
+ } finally {
+ if (callback !=null) callback.onWrite();
+ }
+ }
+ } else {
+ //there are more than one commands created as a result of this message
+ for (ZkWriteCommand cmd : cmds) {
+ if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+ // we don't try to optimize for this case. let's flush out all after
this
+ forceFlush = true;
+ break;
+ }
+ }
+ }
+
+
for (ZkWriteCommand cmd : cmds) {
if (cmd == NO_OP) continue;
if (!isClusterStateModified && clusterStateGetModifiedWith(cmd,
prevState)) {
isClusterStateModified = true;
}
prevState = prevState.copyWith(cmd.name, cmd.collection);
if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
- updates.put(cmd.name, cmd.collection);
+ updates.put(cmd.name, cmd);
numUpdates++;
}
}
clusterState = prevState;
- if (maybeFlushAfter()) {
+ if (forceFlush || maybeFlushAfter()) {
Review comment:
```suggestion
if (forceFlush || maybeFlushAfter()) {
```
##########
File path:
solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
##########
@@ -182,6 +182,7 @@ public void call(ClusterState clusterState, ZkNodeProps
message, @SuppressWarnin
if(created) break;
Review comment:
```suggestion
if (created) break;
```
##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
##########
@@ -281,13 +290,15 @@ private ZkWriteCommand updateState(final ClusterState
prevState, ZkNodeProps mes
}
sliceName = Assign.assignShard(collection, numShards);
log.info("Assigning new node to shard shard={}", sliceName);
+ persistCollectionState = true;
}
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
Review comment:
```suggestion
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
```
##########
File path:
solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java
##########
@@ -100,6 +101,7 @@ public void testTimeCat() throws Exception {
CollectionAdminRequest.DimensionalRoutedAlias dra =
CollectionAdminRequest.createDimensionalRoutedAlias(getAlias(),
CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
+ .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setMaxShardsPerNode(2), TRA_Dim, CRA_Dim);
Review comment:
```suggestion
.setMaxShardsPerNode(2), TRA_Dim, CRA_Dim);
```
##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
##########
@@ -208,6 +234,40 @@ public String getProperty(String propertyName) {
return propertyValue;
}
+ public Replica copyWith(PerReplicaStates.State state) {
+ log.debug("A replica is updated with new state : {}", state);
+ Map<String, Object> props = new LinkedHashMap<>(propMap);
+ if (state == null) {
+ props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
+ props.remove(Slice.LEADER);
+ } else {
+ props.put(ZkStateReader.STATE_PROP, state.state.toString());
+ if (state.isLeader) props.put(Slice.LEADER, "true");
+ }
+ Replica r = new Replica(name, props, collection, slice);
+ r.replicaState = state;
+ return r;
+ }
+
+ public PerReplicaStates.State getReplicaState() {
+ return replicaState;
+ }
+
+ private static final Map<String, State> STATES = new HashMap<>();
+ static {
+ STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE);
+ STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN);
+ STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING);
+ STATES.put(Replica.State.RECOVERY_FAILED.shortName,
Replica.State.RECOVERY_FAILED);
+ }
+ public static State getState(String c) {
Review comment:
```suggestion
public static State getState(String c) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]