This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0125fdc1ab IGNITE-21465 Add system views for partition states (#3760)
0125fdc1ab is described below

commit 0125fdc1ab638da975fc8cb57e6e43423d2f5b15
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu May 16 15:27:00 2024 +0300

    IGNITE-21465 Add system views for partition states (#3760)
---
 modules/rest/build.gradle                          |   1 +
 .../runner/app/ItIgniteNodeRestartTest.java        |  31 ++--
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +
 modules/table/build.gradle                         |   4 +
 .../disaster/ItDisasterRecoverySystemViewTest.java | 101 +++++++++++++
 .../disaster/DisasterRecoveryManager.java          |  14 +-
 .../disaster/DisasterRecoverySystemViews.java      | 161 +++++++++++++++++++++
 7 files changed, 298 insertions(+), 16 deletions(-)

diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index da836c615a..fe2cfa2047 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -41,6 +41,7 @@ dependencies {
     implementation project(':ignite-security-api')
     implementation project(':ignite-compute')
     implementation project(':ignite-eventlog')
+    implementation project(':ignite-system-view-api')
     implementation libs.jetbrains.annotations
     implementation libs.micronaut.inject
     implementation libs.micronaut.http.server.netty
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 27ee1b97d5..d33ff17a4c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertionsAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -227,6 +228,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
     private static final String TABLE_NAME = "Table1";
 
     /** Assume that the table id will always be 8 for the test table. There is 
an assertion to check if this is true. */
+    // TODO: IGNITE-22251 Get rid of it
     private static final int TABLE_ID = 8;
 
     /** Test table name. */
@@ -1470,6 +1472,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
             "false,true",
             "false,false"
     })
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22251";)
     public void createTableCallOnMultipleNodesTest(boolean 
populateStableAssignmentsBeforeTableCreation, boolean restart)
             throws InterruptedException {
         int nodesCount = 3;
@@ -1593,6 +1596,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
             "1,2",
             "2,1"
     })
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22251";)
     public void tableRecoveryOnMultipleRestartingNodes(int 
nodeThatWrittenAssignments, int nodeThatPicksUpAssignments) throws Exception {
         var node0 = startNode(0);
 
@@ -1693,8 +1697,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
     @Test
     public void 
testSequentialAsyncTableCreationThenAlterZoneThenRestartOnMsSnapshot() throws 
InterruptedException {
-        var node0 = startNode(0);
-        var node1 = startNode(1);
+        IgniteImpl node0 = startNode(0);
+        IgniteImpl node1 = startNode(1);
 
         String tableName = "TEST";
         String zoneName = "ZONE_TEST";
@@ -1711,13 +1715,11 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         nodeInhibitor0.startInhibit();
         nodeInhibitor1.startInhibit();
 
-        var assignmentsKey = stablePartAssignmentsKey(new 
TablePartitionId(TABLE_ID, 0));
-
-        var tableFut = createTableInCatalog(node0.catalogManager(), tableName, 
zoneName);
+        CompletableFuture<?> createTableInCatalogFuture = 
createTableInCatalog(node0.catalogManager(), tableName, zoneName);
 
         stopNode(1);
 
-        var alterZoneFut = alterZoneAsync(node0.catalogManager(), zoneName, 1);
+        CompletableFuture<?> alterZoneInCatalogFuture = 
alterZoneInCatalogAsync(node0.catalogManager(), zoneName, 1);
 
         // Wait for the next catalog version: table creation.
         // The next catalog update (alter zone) can't be processed until the 
table creation is completed.
@@ -1739,18 +1741,18 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         node1 = startNode(1);
 
+        ByteArray assignmentsKey = stablePartAssignmentsKey(new 
TablePartitionId(tableId(node1, tableName), 0));
+
         waitForValueInLocalMs(node1.metaStorageManager(), assignmentsKey);
 
         nodeInhibitor0.stopInhibit();
 
-        assertThat(tableFut, willCompleteSuccessfully());
-        assertThat(alterZoneFut, willCompleteSuccessfully());
-
-        assertEquals(TABLE_ID, tableId(node0, tableName));
+        assertThat(createTableInCatalogFuture, willCompleteSuccessfully());
+        assertThat(alterZoneInCatalogFuture, willCompleteSuccessfully());
 
         waitForValueInLocalMs(node0.metaStorageManager(), assignmentsKey);
 
-        var finalNode1 = node1;
+        IgniteImpl finalNode1 = node1;
 
         // Restart is followed by rebalance, because data nodes are 
recalculated after full table creation that is completed after restart.
         assertTrue(waitForCondition(() -> {
@@ -1758,7 +1760,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
             Set<Assignment> assignments1 = 
getAssignmentsFromMetaStorage(finalNode1.metaStorageManager(), 
assignmentsKey.bytes());
 
             return assignments0.size() == 1 && 
assignments0.equals(assignments1);
-
         }, 10_000));
     }
 
@@ -1790,7 +1791,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         return catalogManager.execute(createTableCommand);
     }
 
-    private static CompletableFuture<?> alterZoneAsync(
+    private static CompletableFuture<?> alterZoneInCatalogAsync(
             CatalogManager catalogManager,
             String zoneName,
             @Nullable Integer replicas
@@ -1830,8 +1831,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 : Assignments.fromBytes(e.value()).nodes();
     }
 
-    private int tableId(Ignite node, String tableName) {
-        return (unwrapTableImpl(node.tables().table(tableName))).tableId();
+    private static int tableId(IgniteImpl node, String tableName) {
+        return getTableIdStrict(node.catalogManager(), tableName, 
node.clock().nowLong());
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ea4ecdea58..40f6d8194a 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -796,6 +796,8 @@ public class IgniteImpl implements Ignite {
                 distributedTblMgr
         );
 
+        systemViewManager.register(disasterRecoveryManager);
+
         indexManager = new IndexManager(
                 schemaManager,
                 distributedTblMgr,
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 327a2b3366..c27af4ae51 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -47,6 +47,7 @@ dependencies {
     implementation project(':ignite-failure-handler')
     implementation project(':ignite-workers')
     implementation project(':ignite-low-watermark')
+    implementation project(':ignite-system-view-api')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.auto.service.annotations
@@ -139,6 +140,8 @@ dependencies {
     integrationTestImplementation project(':ignite-failure-handler')
     integrationTestImplementation project(':ignite-low-watermark')
     integrationTestImplementation project(':ignite-metrics')
+    integrationTestImplementation project(':ignite-system-view-api')
+    integrationTestImplementation project(':ignite-system-view')
     integrationTestImplementation(testFixtures(project))
     integrationTestImplementation(testFixtures(project(':ignite-api')))
     integrationTestImplementation(testFixtures(project(':ignite-core')))
@@ -157,6 +160,7 @@ dependencies {
     
integrationTestImplementation(testFixtures(project(':ignite-low-watermark')))
     
integrationTestImplementation(testFixtures(project(':ignite-failure-handler')))
     integrationTestImplementation(testFixtures(project(':ignite-metrics')))
+    integrationTestImplementation(testFixtures(project(':ignite-sql-engine')))
     integrationTestImplementation libs.fastutil.core
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.calcite.core
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
new file mode 100644
index 0000000000..ce430b4a2e
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
+import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
+import static 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.HEALTHY;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/** For integration testing of disaster recovery system views. */
+public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest {
+    private static final String ZONE_NAME = "ZONE_" + TABLE_NAME;
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @Override
+    @BeforeAll
+    protected void beforeAll(TestInfo testInfo) {
+        super.beforeAll(testInfo);
+
+        assertThat(systemViewManager().completeRegistration(), 
willCompleteSuccessfully());
+    }
+
+    @AfterEach
+    void tearDown() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + ZONE_NAME);
+    }
+
+    @Test
+    void testNoZonesAndTables() {
+        
assertQuery(globalPartitionStatesSystemViewSql()).returnNothing().check();
+        
assertQuery(localPartitionStatesSystemViewSql()).returnNothing().check();
+    }
+
+    @Test
+    void testGlobalPartitionStatesSystemView() {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 2);
+
+        assertQuery(globalPartitionStatesSystemViewSql())
+                .returns(ZONE_NAME, TABLE_NAME, 0, AVAILABLE.name())
+                .returns(ZONE_NAME, TABLE_NAME, 1, AVAILABLE.name())
+                .check();
+    }
+
+    @Test
+    void testLocalPartitionStatesSystemView() {
+        assertEquals(2, initialNodes());
+
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 2);
+
+        List<String> nodeNames = 
CLUSTER.runningNodes().map(IgniteImpl::name).sorted().collect(toList());
+
+        String nodeName0 = nodeNames.get(0);
+        String nodeName1 = nodeNames.get(1);
+
+        assertQuery(localPartitionStatesSystemViewSql())
+                .returns(nodeName0, ZONE_NAME, TABLE_NAME, 0, HEALTHY.name())
+                .returns(nodeName0, ZONE_NAME, TABLE_NAME, 1, HEALTHY.name())
+                .returns(nodeName1, ZONE_NAME, TABLE_NAME, 0, HEALTHY.name())
+                .returns(nodeName1, ZONE_NAME, TABLE_NAME, 1, HEALTHY.name())
+                .check();
+    }
+
+    private static String globalPartitionStatesSystemViewSql() {
+        return "SELECT ZONE_NAME, TABLE_NAME, PARTITION_ID, STATE FROM 
SYSTEM.GLOBAL_PARTITION_STATES";
+    }
+
+    private static String localPartitionStatesSystemViewSql() {
+        return "SELECT NODE_NAME, ZONE_NAME, TABLE_NAME, PARTITION_ID, STATE 
FROM SYSTEM.LOCAL_PARTITION_STATES";
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 969051feda..61e11429f3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -24,6 +24,8 @@ import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalPartitionStatesSystemView;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalPartitionStatesSystemView;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
@@ -69,6 +71,8 @@ import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViewProvider;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -94,7 +98,7 @@ import org.jetbrains.annotations.Nullable;
  * As a reaction to these updates, manager performs actual recovery 
operations, such as {@link #resetPartitions(String, String, Set)}.
  * More details are in the <a 
href="https://issues.apache.org/jira/browse/IGNITE-21140";>epic</a>.
  */
-public class DisasterRecoveryManager implements IgniteComponent {
+public class DisasterRecoveryManager implements IgniteComponent, 
SystemViewProvider {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(DisasterRecoveryManager.class);
 
@@ -200,6 +204,14 @@ public class DisasterRecoveryManager implements 
IgniteComponent {
         return nullCompletedFuture();
     }
 
+    @Override
+    public List<SystemView<?>> systemViews() {
+        return List.of(
+                createGlobalPartitionStatesSystemView(this),
+                createLocalPartitionStatesSystemView(this)
+        );
+    }
+
     /**
      * Updates assignments of the table in a forced manner, allowing for the 
recovery of raft group with lost majorities. It is achieved via
      * triggering a new rebalance with {@code force} flag enabled in {@link 
Assignments} for partitions where it's required. New pending
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
new file mode 100644
index 0000000000..4e0d964a1c
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static java.util.Comparator.comparing;
+import static org.apache.ignite.internal.type.NativeTypes.INT32;
+import static org.apache.ignite.internal.type.NativeTypes.STRING;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViews;
+
+/** Helper class for disaster recovery system views. */
+class DisasterRecoverySystemViews {
+    private static final Comparator<GlobalPartitionState> 
GLOBAL_PARTITION_STATE_COMPARATOR =
+            comparing((GlobalPartitionState state) -> 
state.tableName).thenComparingInt(state -> state.partitionId);
+
+    private static final Comparator<SystemViewLocalPartitionState> 
SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR =
+            comparing((SystemViewLocalPartitionState state) -> 
state.state.tableName)
+                    .thenComparingInt(state -> state.state.partitionId)
+                    .thenComparing(state -> state.nodeName);
+
+    static SystemView<?> 
createGlobalPartitionStatesSystemView(DisasterRecoveryManager manager) {
+        return SystemViews.<GlobalPartitionState>clusterViewBuilder()
+                .name("GLOBAL_PARTITION_STATES")
+                .addColumn("ZONE_NAME", STRING, state -> state.zoneName)
+                .addColumn("TABLE_NAME", STRING, state -> state.tableName)
+                .addColumn("PARTITION_ID", INT32, state -> state.partitionId)
+                .addColumn("STATE", STRING, state -> state.state.name())
+                .dataProvider(systemViewPublisher(() -> 
globalPartitionStatesAsync(manager)))
+                .build();
+    }
+
+    static SystemView<?> 
createLocalPartitionStatesSystemView(DisasterRecoveryManager manager) {
+        return SystemViews.<SystemViewLocalPartitionState>clusterViewBuilder()
+                .name("LOCAL_PARTITION_STATES")
+                .addColumn("NODE_NAME", STRING, state -> state.nodeName)
+                .addColumn("ZONE_NAME", STRING, state -> state.state.zoneName)
+                .addColumn("TABLE_NAME", STRING, state -> 
state.state.tableName)
+                .addColumn("PARTITION_ID", INT32, state -> 
state.state.partitionId)
+                .addColumn("STATE", STRING, state -> state.state.state.name())
+                .dataProvider(systemViewPublisher(() -> 
localPartitionStatesAsync(manager)))
+                .build();
+    }
+
+    private static <T> Publisher<T> 
systemViewPublisher(Supplier<CompletableFuture<Iterator<T>>> invokeApi) {
+        return subscriber -> {
+            CompletableFuture<Iterator<T>> invokeApiFuture = invokeApi.get();
+
+            subscriber.onSubscribe(new SystemViewSubscription<>(subscriber, 
invokeApiFuture));
+        };
+    }
+
+    private static CompletableFuture<Iterator<GlobalPartitionState>> 
globalPartitionStatesAsync(DisasterRecoveryManager manager) {
+        return manager.globalPartitionStates(Set.of(), 
Set.of()).thenApply(states -> states.values().stream()
+                .sorted(GLOBAL_PARTITION_STATE_COMPARATOR)
+                .iterator()
+        );
+    }
+
+    private static CompletableFuture<Iterator<SystemViewLocalPartitionState>> 
localPartitionStatesAsync(DisasterRecoveryManager manager) {
+        return manager.localPartitionStates(Set.of(), Set.of(), 
Set.of()).thenApply(states -> states.values().stream()
+                .flatMap(statesByNodeName -> 
statesByNodeName.entrySet().stream())
+                .map(nodeStates -> new 
SystemViewLocalPartitionState(nodeStates.getKey(), nodeStates.getValue()))
+                .sorted(SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR)
+                .iterator()
+        );
+    }
+
+    private static class SystemViewLocalPartitionState {
+        private final String nodeName;
+
+        private final LocalPartitionState state;
+
+        private SystemViewLocalPartitionState(String nodeName, 
LocalPartitionState state) {
+            this.nodeName = nodeName;
+            this.state = state;
+        }
+    }
+
+    private static class SystemViewSubscription<T> implements Subscription {
+        private final Subscriber<? super T> subscriber;
+
+        private final CompletableFuture<Iterator<T>> invokeApiFuture;
+
+        private volatile boolean complete;
+
+        private SystemViewSubscription(Subscriber<? super T> subscriber, 
CompletableFuture<Iterator<T>> invokeApiFuture) {
+            this.subscriber = subscriber;
+            this.invokeApiFuture = invokeApiFuture;
+        }
+
+        @Override
+        public void request(long n) {
+            if (n <= 0) {
+                onError(new IllegalArgumentException("Must be positive: " + 
n));
+
+                return;
+            }
+
+            if (complete) {
+                return;
+            }
+
+            invokeApiFuture.whenComplete((iterator, throwable) -> {
+                if (throwable != null) {
+                    onError(throwable);
+                } else {
+                    drain(iterator, n);
+                }
+            });
+        }
+
+        @Override
+        public void cancel() {
+            complete = true;
+        }
+
+        private void onError(Throwable t) {
+            cancel();
+            subscriber.onError(t);
+        }
+
+        private void drain(Iterator<T> iterator, long n) {
+            if (complete) {
+                return;
+            }
+
+            while (iterator.hasNext() && n-- > 0) {
+                subscriber.onNext(iterator.next());
+            }
+
+            if (!iterator.hasNext()) {
+                complete = true;
+                subscriber.onComplete();
+            }
+        }
+    }
+}

Reply via email to