This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0125fdc1ab IGNITE-21465 Add system views for partition states (#3760)
0125fdc1ab is described below
commit 0125fdc1ab638da975fc8cb57e6e43423d2f5b15
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu May 16 15:27:00 2024 +0300
IGNITE-21465 Add system views for partition states (#3760)
---
modules/rest/build.gradle | 1 +
.../runner/app/ItIgniteNodeRestartTest.java | 31 ++--
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +
modules/table/build.gradle | 4 +
.../disaster/ItDisasterRecoverySystemViewTest.java | 101 +++++++++++++
.../disaster/DisasterRecoveryManager.java | 14 +-
.../disaster/DisasterRecoverySystemViews.java | 161 +++++++++++++++++++++
7 files changed, 298 insertions(+), 16 deletions(-)
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index da836c615a..fe2cfa2047 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -41,6 +41,7 @@ dependencies {
implementation project(':ignite-security-api')
implementation project(':ignite-compute')
implementation project(':ignite-eventlog')
+ implementation project(':ignite-system-view-api')
implementation libs.jetbrains.annotations
implementation libs.micronaut.inject
implementation libs.micronaut.http.server.netty
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 27ee1b97d5..d33ff17a4c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertionsAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -227,6 +228,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
private static final String TABLE_NAME = "Table1";
/** Assume that the table id will always be 8 for the test table. There is
an assertion to check if this is true. */
+ // TODO: IGNITE-22251 Get rid of it
private static final int TABLE_ID = 8;
/** Test table name. */
@@ -1470,6 +1472,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
"false,true",
"false,false"
})
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-22251")
public void createTableCallOnMultipleNodesTest(boolean
populateStableAssignmentsBeforeTableCreation, boolean restart)
throws InterruptedException {
int nodesCount = 3;
@@ -1593,6 +1596,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
"1,2",
"2,1"
})
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-22251")
public void tableRecoveryOnMultipleRestartingNodes(int
nodeThatWrittenAssignments, int nodeThatPicksUpAssignments) throws Exception {
var node0 = startNode(0);
@@ -1693,8 +1697,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
@Test
public void
testSequentialAsyncTableCreationThenAlterZoneThenRestartOnMsSnapshot() throws
InterruptedException {
- var node0 = startNode(0);
- var node1 = startNode(1);
+ IgniteImpl node0 = startNode(0);
+ IgniteImpl node1 = startNode(1);
String tableName = "TEST";
String zoneName = "ZONE_TEST";
@@ -1711,13 +1715,11 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
nodeInhibitor0.startInhibit();
nodeInhibitor1.startInhibit();
- var assignmentsKey = stablePartAssignmentsKey(new
TablePartitionId(TABLE_ID, 0));
-
- var tableFut = createTableInCatalog(node0.catalogManager(), tableName,
zoneName);
+ CompletableFuture<?> createTableInCatalogFuture =
createTableInCatalog(node0.catalogManager(), tableName, zoneName);
stopNode(1);
- var alterZoneFut = alterZoneAsync(node0.catalogManager(), zoneName, 1);
+ CompletableFuture<?> alterZoneInCatalogFuture =
alterZoneInCatalogAsync(node0.catalogManager(), zoneName, 1);
// Wait for the next catalog version: table creation.
// The next catalog update (alter zone) can't be processed until the
table creation is completed.
@@ -1739,18 +1741,18 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
node1 = startNode(1);
+ ByteArray assignmentsKey = stablePartAssignmentsKey(new
TablePartitionId(tableId(node1, tableName), 0));
+
waitForValueInLocalMs(node1.metaStorageManager(), assignmentsKey);
nodeInhibitor0.stopInhibit();
- assertThat(tableFut, willCompleteSuccessfully());
- assertThat(alterZoneFut, willCompleteSuccessfully());
-
- assertEquals(TABLE_ID, tableId(node0, tableName));
+ assertThat(createTableInCatalogFuture, willCompleteSuccessfully());
+ assertThat(alterZoneInCatalogFuture, willCompleteSuccessfully());
waitForValueInLocalMs(node0.metaStorageManager(), assignmentsKey);
- var finalNode1 = node1;
+ IgniteImpl finalNode1 = node1;
// Restart is followed by rebalance, because data nodes are
recalculated after full table creation that is completed after restart.
assertTrue(waitForCondition(() -> {
@@ -1758,7 +1760,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
Set<Assignment> assignments1 =
getAssignmentsFromMetaStorage(finalNode1.metaStorageManager(),
assignmentsKey.bytes());
return assignments0.size() == 1 &&
assignments0.equals(assignments1);
-
}, 10_000));
}
@@ -1790,7 +1791,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
return catalogManager.execute(createTableCommand);
}
- private static CompletableFuture<?> alterZoneAsync(
+ private static CompletableFuture<?> alterZoneInCatalogAsync(
CatalogManager catalogManager,
String zoneName,
@Nullable Integer replicas
@@ -1830,8 +1831,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
: Assignments.fromBytes(e.value()).nodes();
}
- private int tableId(Ignite node, String tableName) {
- return (unwrapTableImpl(node.tables().table(tableName))).tableId();
+ private static int tableId(IgniteImpl node, String tableName) {
+ return getTableIdStrict(node.catalogManager(), tableName,
node.clock().nowLong());
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ea4ecdea58..40f6d8194a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -796,6 +796,8 @@ public class IgniteImpl implements Ignite {
distributedTblMgr
);
+ systemViewManager.register(disasterRecoveryManager);
+
indexManager = new IndexManager(
schemaManager,
distributedTblMgr,
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 327a2b3366..c27af4ae51 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -47,6 +47,7 @@ dependencies {
implementation project(':ignite-failure-handler')
implementation project(':ignite-workers')
implementation project(':ignite-low-watermark')
+ implementation project(':ignite-system-view-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
@@ -139,6 +140,8 @@ dependencies {
integrationTestImplementation project(':ignite-failure-handler')
integrationTestImplementation project(':ignite-low-watermark')
integrationTestImplementation project(':ignite-metrics')
+ integrationTestImplementation project(':ignite-system-view-api')
+ integrationTestImplementation project(':ignite-system-view')
integrationTestImplementation(testFixtures(project))
integrationTestImplementation(testFixtures(project(':ignite-api')))
integrationTestImplementation(testFixtures(project(':ignite-core')))
@@ -157,6 +160,7 @@ dependencies {
integrationTestImplementation(testFixtures(project(':ignite-low-watermark')))
integrationTestImplementation(testFixtures(project(':ignite-failure-handler')))
integrationTestImplementation(testFixtures(project(':ignite-metrics')))
+ integrationTestImplementation(testFixtures(project(':ignite-sql-engine')))
integrationTestImplementation libs.fastutil.core
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.calcite.core
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
new file mode 100644
index 0000000000..ce430b4a2e
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.table.TableTestUtils.TABLE_NAME;
+import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
+import static
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum.HEALTHY;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/** For integration testing of disaster recovery system views. */
+public class ItDisasterRecoverySystemViewTest extends BaseSqlIntegrationTest {
+ private static final String ZONE_NAME = "ZONE_" + TABLE_NAME;
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Override
+ @BeforeAll
+ protected void beforeAll(TestInfo testInfo) {
+ super.beforeAll(testInfo);
+
+ assertThat(systemViewManager().completeRegistration(),
willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void tearDown() {
+ sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ sql("DROP ZONE IF EXISTS " + ZONE_NAME);
+ }
+
+ @Test
+ void testNoZonesAndTables() {
+
assertQuery(globalPartitionStatesSystemViewSql()).returnNothing().check();
+
assertQuery(localPartitionStatesSystemViewSql()).returnNothing().check();
+ }
+
+ @Test
+ void testGlobalPartitionStatesSystemView() {
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 2);
+
+ assertQuery(globalPartitionStatesSystemViewSql())
+ .returns(ZONE_NAME, TABLE_NAME, 0, AVAILABLE.name())
+ .returns(ZONE_NAME, TABLE_NAME, 1, AVAILABLE.name())
+ .check();
+ }
+
+ @Test
+ void testLocalPartitionStatesSystemView() {
+ assertEquals(2, initialNodes());
+
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 2);
+
+ List<String> nodeNames =
CLUSTER.runningNodes().map(IgniteImpl::name).sorted().collect(toList());
+
+ String nodeName0 = nodeNames.get(0);
+ String nodeName1 = nodeNames.get(1);
+
+ assertQuery(localPartitionStatesSystemViewSql())
+ .returns(nodeName0, ZONE_NAME, TABLE_NAME, 0, HEALTHY.name())
+ .returns(nodeName0, ZONE_NAME, TABLE_NAME, 1, HEALTHY.name())
+ .returns(nodeName1, ZONE_NAME, TABLE_NAME, 0, HEALTHY.name())
+ .returns(nodeName1, ZONE_NAME, TABLE_NAME, 1, HEALTHY.name())
+ .check();
+ }
+
+ private static String globalPartitionStatesSystemViewSql() {
+ return "SELECT ZONE_NAME, TABLE_NAME, PARTITION_ID, STATE FROM
SYSTEM.GLOBAL_PARTITION_STATES";
+ }
+
+ private static String localPartitionStatesSystemViewSql() {
+ return "SELECT NODE_NAME, ZONE_NAME, TABLE_NAME, PARTITION_ID, STATE
FROM SYSTEM.LOCAL_PARTITION_STATES";
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 969051feda..61e11429f3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -24,6 +24,8 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createGlobalPartitionStatesSystemView;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews.createLocalPartitionStatesSystemView;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.AVAILABLE;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.DEGRADED;
import static
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum.READ_ONLY;
@@ -69,6 +71,8 @@ import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -94,7 +98,7 @@ import org.jetbrains.annotations.Nullable;
* As a reaction to these updates, manager performs actual recovery
operations, such as {@link #resetPartitions(String, String, Set)}.
* More details are in the <a
href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>.
*/
-public class DisasterRecoveryManager implements IgniteComponent {
+public class DisasterRecoveryManager implements IgniteComponent,
SystemViewProvider {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(DisasterRecoveryManager.class);
@@ -200,6 +204,14 @@ public class DisasterRecoveryManager implements
IgniteComponent {
return nullCompletedFuture();
}
+ @Override
+ public List<SystemView<?>> systemViews() {
+ return List.of(
+ createGlobalPartitionStatesSystemView(this),
+ createLocalPartitionStatesSystemView(this)
+ );
+ }
+
/**
* Updates assignments of the table in a forced manner, allowing for the
recovery of raft group with lost majorities. It is achieved via
* triggering a new rebalance with {@code force} flag enabled in {@link
Assignments} for partitions where it's required. New pending
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
new file mode 100644
index 0000000000..4e0d964a1c
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoverySystemViews.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import static java.util.Comparator.comparing;
+import static org.apache.ignite.internal.type.NativeTypes.INT32;
+import static org.apache.ignite.internal.type.NativeTypes.STRING;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.systemview.api.SystemView;
+import org.apache.ignite.internal.systemview.api.SystemViews;
+
+/** Helper class for disaster recovery system views. */
+class DisasterRecoverySystemViews {
+ private static final Comparator<GlobalPartitionState>
GLOBAL_PARTITION_STATE_COMPARATOR =
+ comparing((GlobalPartitionState state) ->
state.tableName).thenComparingInt(state -> state.partitionId);
+
+ private static final Comparator<SystemViewLocalPartitionState>
SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR =
+ comparing((SystemViewLocalPartitionState state) ->
state.state.tableName)
+ .thenComparingInt(state -> state.state.partitionId)
+ .thenComparing(state -> state.nodeName);
+
+ static SystemView<?>
createGlobalPartitionStatesSystemView(DisasterRecoveryManager manager) {
+ return SystemViews.<GlobalPartitionState>clusterViewBuilder()
+ .name("GLOBAL_PARTITION_STATES")
+ .addColumn("ZONE_NAME", STRING, state -> state.zoneName)
+ .addColumn("TABLE_NAME", STRING, state -> state.tableName)
+ .addColumn("PARTITION_ID", INT32, state -> state.partitionId)
+ .addColumn("STATE", STRING, state -> state.state.name())
+ .dataProvider(systemViewPublisher(() ->
globalPartitionStatesAsync(manager)))
+ .build();
+ }
+
+ static SystemView<?>
createLocalPartitionStatesSystemView(DisasterRecoveryManager manager) {
+ return SystemViews.<SystemViewLocalPartitionState>clusterViewBuilder()
+ .name("LOCAL_PARTITION_STATES")
+ .addColumn("NODE_NAME", STRING, state -> state.nodeName)
+ .addColumn("ZONE_NAME", STRING, state -> state.state.zoneName)
+ .addColumn("TABLE_NAME", STRING, state ->
state.state.tableName)
+ .addColumn("PARTITION_ID", INT32, state ->
state.state.partitionId)
+ .addColumn("STATE", STRING, state -> state.state.state.name())
+ .dataProvider(systemViewPublisher(() ->
localPartitionStatesAsync(manager)))
+ .build();
+ }
+
+ private static <T> Publisher<T>
systemViewPublisher(Supplier<CompletableFuture<Iterator<T>>> invokeApi) {
+ return subscriber -> {
+ CompletableFuture<Iterator<T>> invokeApiFuture = invokeApi.get();
+
+ subscriber.onSubscribe(new SystemViewSubscription<>(subscriber,
invokeApiFuture));
+ };
+ }
+
+ private static CompletableFuture<Iterator<GlobalPartitionState>>
globalPartitionStatesAsync(DisasterRecoveryManager manager) {
+ return manager.globalPartitionStates(Set.of(),
Set.of()).thenApply(states -> states.values().stream()
+ .sorted(GLOBAL_PARTITION_STATE_COMPARATOR)
+ .iterator()
+ );
+ }
+
+ private static CompletableFuture<Iterator<SystemViewLocalPartitionState>>
localPartitionStatesAsync(DisasterRecoveryManager manager) {
+ return manager.localPartitionStates(Set.of(), Set.of(),
Set.of()).thenApply(states -> states.values().stream()
+ .flatMap(statesByNodeName ->
statesByNodeName.entrySet().stream())
+ .map(nodeStates -> new
SystemViewLocalPartitionState(nodeStates.getKey(), nodeStates.getValue()))
+ .sorted(SYSTEM_VIEW_LOCAL_PARTITION_STATE_COMPARATOR)
+ .iterator()
+ );
+ }
+
+ private static class SystemViewLocalPartitionState {
+ private final String nodeName;
+
+ private final LocalPartitionState state;
+
+ private SystemViewLocalPartitionState(String nodeName,
LocalPartitionState state) {
+ this.nodeName = nodeName;
+ this.state = state;
+ }
+ }
+
+ private static class SystemViewSubscription<T> implements Subscription {
+ private final Subscriber<? super T> subscriber;
+
+ private final CompletableFuture<Iterator<T>> invokeApiFuture;
+
+ private volatile boolean complete;
+
+ private SystemViewSubscription(Subscriber<? super T> subscriber,
CompletableFuture<Iterator<T>> invokeApiFuture) {
+ this.subscriber = subscriber;
+ this.invokeApiFuture = invokeApiFuture;
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ onError(new IllegalArgumentException("Must be positive: " +
n));
+
+ return;
+ }
+
+ if (complete) {
+ return;
+ }
+
+ invokeApiFuture.whenComplete((iterator, throwable) -> {
+ if (throwable != null) {
+ onError(throwable);
+ } else {
+ drain(iterator, n);
+ }
+ });
+ }
+
+ @Override
+ public void cancel() {
+ complete = true;
+ }
+
+ private void onError(Throwable t) {
+ cancel();
+ subscriber.onError(t);
+ }
+
+ private void drain(Iterator<T> iterator, long n) {
+ if (complete) {
+ return;
+ }
+
+ while (iterator.hasNext() && n-- > 0) {
+ subscriber.onNext(iterator.next());
+ }
+
+ if (!iterator.hasNext()) {
+ complete = true;
+ subscriber.onComplete();
+ }
+ }
+ }
+}