alievmirza commented on code in PR #1572:
URL: https://github.com/apache/ignite-3/pull/1572#discussion_r1095444376
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1782,11 +1821,70 @@ private RuntimeException convertThrowable(Throwable th)
{
return new IgniteException(th);
}
- /**
- * Register the new meta storage listener for changes in the
rebalance-specific keys.
- */
- private void registerRebalanceListeners() {
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
new WatchListener() {
+ private WatchListener createDistributionZonesDataNodesListener() {
Review Comment:
Javadocs, here and below for new methods that you've introduced
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1782,11 +1821,70 @@ private RuntimeException convertThrowable(Throwable th)
{
return new IgniteException(th);
}
- /**
- * Register the new meta storage listener for changes in the
rebalance-specific keys.
- */
- private void registerRebalanceListeners() {
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
new WatchListener() {
+ private WatchListener createDistributionZonesDataNodesListener() {
+ return new WatchListener() {
+ @Override
+ public void onUpdate(WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(new
NodeStoppingException());
+ }
+
+ try {
+ byte[] dataNodesBytes =
evt.entryEvent().newEntry().value();
+
+ if (dataNodesBytes == null) {
+ //The zone was removed so data nodes was removed too.
+ return;
+ }
+
+ NamedConfigurationTree<TableConfiguration, TableView,
TableChange> tables = tablesCfg.tables();
+
+ int zoneId =
extractZoneId(evt.entryEvent().newEntry().key());
+
+ Set<String> dataNodes =
ByteUtils.fromBytes(dataNodesBytes);
+
+ for (int i = 0; i < tables.value().size(); i++) {
+ TableView tableView = tables.value().get(i);
+
+ int tableZoneId = tableView.zoneId();
+
+ if (zoneId == tableZoneId) {
+ TableConfiguration tableCfg =
tables.get(tableView.name());
+
+ for (int part = 0; part < tableView.partitions();
part++) {
+ UUID tableId = ((ExtendedTableConfiguration)
tableCfg).id().value();
+
+ TablePartitionId replicaGrpId = new
TablePartitionId(tableId, part);
+
+ int partId = part;
+
+ updatePendingAssignmentsKeys(
+ tableView.name(), replicaGrpId,
dataNodes, tableView.replicas(),
+
evt.entryEvent().newEntry().revision(), metaStorageMgr, part
+ ).exceptionally(e -> {
+ LOG.error(
Review Comment:
Should it be error or warn?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]