This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8ae06dc42fe IGNITE-26394 Fix
ItAbstractDataStreamerTest.testFailedItems flakiness (#6605)
8ae06dc42fe is described below
commit 8ae06dc42fe5a9e9ba0eca7b81fafa0854a11c5e
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Sep 17 16:40:24 2025 +0300
IGNITE-26394 Fix ItAbstractDataStreamerTest.testFailedItems flakiness
(#6605)
---
.../ignite/internal/streamer/ItAbstractDataStreamerTest.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index d7c22867b0a..a631918fa74 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -581,6 +581,7 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
CompletableFuture<Void> streamerFut;
var invalidItemsAdded = new ArrayList<DataStreamerItem<Tuple>>();
+ String invalidColName = "name1";
try (var publisher = new DirectPublisher<DataStreamerItem<Tuple>>()) {
var options = DataStreamerOptions.builder()
@@ -603,7 +604,7 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
// Submit invalid items.
for (int i = 200; i < 300; i++) {
DataStreamerItem<Tuple> item = DataStreamerItem.of(
- Tuple.create().set("id", i).set("name1", "bar-" + i),
+ Tuple.create().set("id", i).set(invalidColName, "bar-"
+ i),
i % 2 == 0 ? DataStreamerOperationType.PUT :
DataStreamerOperationType.REMOVE);
try {
@@ -634,6 +635,11 @@ public abstract class ItAbstractDataStreamerTest extends
ClusterPerClassIntegrat
}
for (DataStreamerItem<Tuple> failedItem : failedItems) {
+ if (failedItem.get().columnIndex(invalidColName) < 0) {
+ // Valid item failed to flush, ignore.
+ continue;
+ }
+
assertTrue(invalidItemsAdded.contains(failedItem),
"invalidItemsAdded item not found: " + failedItem.get());
}