tkalkirill commented on code in PR #3358:
URL: https://github.com/apache/ignite-3/pull/3358#discussion_r1522663198


##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +198,56 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private void onIndexRemoved(RemoveIndexEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int catalogVersion = parameters.catalogVersion();
+            int previousCatalogVersion = catalogVersion - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, previousCatalogVersion);
+            assert indexDescriptor != null : "index";
+
+            int tableId = indexDescriptor.tableId();
+
+            if (catalogService.table(tableId, catalogVersion) == null) {
+                // Nothing to do. Index will be destroyed along with the table.
+                return;
+            }
+
+            destructionEventsQueue.enqueue(new 
DestroyIndexEvent(catalogVersion, indexId, tableId));
+        });
+    }
+
+    private CompletableFuture<Void> onLwmChanged(HybridTimestamp ts) {
+        return inBusyLockAsync(busyLock, () -> {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(ts));
+
+            List<DestroyIndexEvent> events = 
destructionEventsQueue.drainUpTo(newEarliestCatalogVersion);
+
+            if (events.isEmpty()) {
+                return nullCompletedFuture();
+            }
+
+            List<CompletableFuture<Void>> futures = events.stream()
+                    .map(event -> destroyIndexAsync(event.indexId(), 
event.tableId()))
+                    .collect(toList());
+
+            return allOf(futures.toArray(CompletableFuture[]::new));
+        });
+    }
+
+    private CompletableFuture<Void> destroyIndexAsync(int indexId, int 
tableId) {
+        return runAsync(() -> inBusyLock(busyLock, () -> {

Review Comment:
   For me, this method does not need a busyLock since it is called already in 
it. 
   And also it does not need to be called asynchronously (`runAsync`). 
   But method `unregisterIndex` itself must return the future.



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
+import 
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Low watermark dummy implementation, which requires explicit {@link 
#updateAndNotify(HybridTimestamp)} method call to notify listeners.
+ * This implementation has no persistent state and notifies listeners 
instantly in same thread.
+ */
+public class TestLowWatermark implements LowWatermark {
+    private final List<LowWatermarkChangedListener> listeners = new 
CopyOnWriteArrayList<>();
+    private volatile HybridTimestamp ts;

Review Comment:
   ```suggestion
       @Nullable private volatile HybridTimestamp ts;
   ```



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TestLowWatermark.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
+import 
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Low watermark dummy implementation, which requires explicit {@link 
#updateAndNotify(HybridTimestamp)} method call to notify listeners.
+ * This implementation has no persistent state and notifies listeners 
instantly in same thread.
+ */
+public class TestLowWatermark implements LowWatermark {
+    private final List<LowWatermarkChangedListener> listeners = new 
CopyOnWriteArrayList<>();
+    private volatile HybridTimestamp ts;
+
+    @Override
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return ts;
+    }
+
+    @Override
+    public void addUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(LowWatermarkChangedListener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * Update low watermark and notify listeners.
+     *
+     * @param newTs New timestamp.
+     * @return Listener notification future.
+     */
+    public CompletableFuture<Void> updateAndNotify(HybridTimestamp newTs) {
+        assert newTs != null;
+        assert ts == null || ts.longValue() < newTs.longValue();

Review Comment:
   ```suggestion
           assert ts == null || ts.longValue() < newTs.longValue() : "ts=" + ts 
+ ", newTs=" + newTs;
   ```



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java:
##########
@@ -261,52 +280,60 @@ void stop() {
 
         busyLock.block();
 
-        catalogService.removeListener(CatalogEvent.TABLE_DESTROY, 
(EventListener) this);
-        
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
(EventListener) this);
+        lowWatermark.removeUpdateListener(lwmListener);
+        catalogService.removeListener(CatalogEvent.TABLE_DROP, 
dropTableEventListener);
+        
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaEventListener);
         primaryReplicas.clear();
     }
 
-    @Override
-    public CompletableFuture<Boolean> notify(EventParameters parameters) {
-        if (!busyLock.enterBusy()) {
-            return CompletableFuture.failedFuture(new NodeStoppingException());
-        }
+    private void onPrimaryReplicaChanged(PrimaryReplicaEventParameters 
primaryReplicaEvent) {
+        inBusyLock(busyLock, () -> {
+            if (!(primaryReplicaEvent.groupId() instanceof TablePartitionId)) {
+                return;
+            }
 
-        try {
-            return notifyInternal(parameters);
-        } finally {
-            busyLock.leaveBusy();
-        }
+            TablePartitionId tablePartitionId = (TablePartitionId) 
primaryReplicaEvent.groupId();
+
+            updatePrimaryReplica(tablePartitionId, 
primaryReplicaEvent.startTime(), primaryReplicaEvent.leaseholder());
+        });
     }
 
-    private CompletableFuture<Boolean> notifyInternal(EventParameters 
parameters) {
-        if (parameters instanceof DestroyTableEventParameters) {
-            removeTable((DestroyTableEventParameters) parameters);
+    private void onTableDrop(DropTableEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            DropTableEventParameters event = parameters;
 
-            return falseCompletedFuture();
-        }
+            int tableId = event.tableId();
+            int catalogVersion = event.catalogVersion();
+            int previousVersion = catalogVersion - 1;

Review Comment:
   Please rename to `previousCatalogVersion`.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java:
##########
@@ -198,6 +198,56 @@ private CompletableFuture<Boolean> 
onIndexCreate(CreateIndexEventParameters para
         });
     }
 
+    private void onIndexRemoved(RemoveIndexEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int indexId = parameters.indexId();
+            int catalogVersion = parameters.catalogVersion();
+            int previousCatalogVersion = catalogVersion - 1;
+
+            // Retrieve descriptor during synchronous call, before the 
previous catalog version could be concurrently compacted.
+            CatalogIndexDescriptor indexDescriptor = 
catalogService.index(indexId, previousCatalogVersion);
+            assert indexDescriptor != null : "index";

Review Comment:
   ```suggestion
               assert indexDescriptor != null : "indexId=" + indexId + ", 
catalogVersion=" + previousCatalogVersion;
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/SynchronousPriorityQueue.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A thread-safe wrapper over {@link java.util.PriorityQueue}, which uses 
{@code long} value for ordering.
+ * The implementation provides a method to poll top item up to the given 
priority.
+ *
+ * @param <T> Item type.
+ */
+public class SynchronousPriorityQueue<T> {
+    /** A queue. Guarded by itself. */
+    private final PriorityQueue<T> queue;
+    private final ToLongFunction<T> priorityExtractor;
+
+    /**
+     * Creates a queue.
+     *
+     * @param priorityExtractor Priority extractor.
+     */
+    public SynchronousPriorityQueue(ToLongFunction<T> priorityExtractor) {

Review Comment:
   It has become better, but it is still not obvious why there should be a 
`long`. 
   Why am I grumbling, the class looks like a general one, but on the other 
hand it is very specific.
   
   If we want to make it general, then I think it’s better to pass a function 
to retrieve the priority being compared (`int`, `long`, `string`, etc.). Or 
rename it to queue `LwmDestuctionQueue`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUtils.java:
##########
@@ -84,32 +87,34 @@ public static void registerIndexToTable(
      * @param catalogService Catalog service.
      * @param partitionSet Partitions for which index storages will need to be 
created if they are missing.
      * @param schemaRegistry Table schema register.
+     * @param lwm Low watermark, which is used to extract alive indexes from 
the catalog.

Review Comment:
   Please describe how exactly it is used, it’s not entirely clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to