This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git
The following commit(s) were added to refs/heads/main by this push:
new 3725e23 Test sender maxInflight with multiple values (#93)
3725e23 is described below
commit 3725e23f2e4598e99284ff827bf5bfea1973e40f
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Dec 7 16:55:04 2022 +0100
Test sender maxInflight with multiple values (#93)
---
.../adapter/AdaptedReactiveMessageSenderTest.java | 35 +++++++++++++---------
1 file changed, 21 insertions(+), 14 deletions(-)
diff --git
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index 43f73dd..163d166 100644
---
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -58,6 +58,7 @@ import
org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.InOrder;
import org.mockito.Mockito;
@@ -328,20 +329,25 @@ class AdaptedReactiveMessageSenderTest {
assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4),
Duration.ofSeconds(5));
}
- @Test
- void maxInFlightUsingSendOne() throws Exception {
- doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux
- .flatMap((i) ->
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100));
+ @ParameterizedTest
+ @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+ void maxInFlightUsingSendOne(int maxInflight, int maxElements) throws
Exception {
+ doTestMaxInFlight(
+ (reactiveSender, inputFlux) -> inputFlux
+ .flatMap((i) ->
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100),
+ maxInflight, maxElements);
}
- @Test
- void maxInFlightUsingSendMany() throws Exception {
+ @ParameterizedTest
+ @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+ void maxInFlightUsingSendMany(int maxInflight, int maxElements) throws
Exception {
doTestMaxInFlight((reactiveSender, inputFlux) ->
inputFlux.window(3).flatMap(
- (subFlux) -> subFlux.map((i) ->
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100));
+ (subFlux) -> subFlux.map((i) ->
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100),
+ maxInflight, maxElements);
}
- void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>,
Flux<Integer>, Flux<MessageId>> sendingFunction)
- throws Exception {
+ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>,
Flux<Integer>, Flux<MessageId>> sendingFunction,
+ int maxInflight, int maxElements) throws Exception {
ScheduledExecutorService executorService = null;
try {
executorService =
Executors.newSingleThreadScheduledExecutor();
@@ -368,7 +374,7 @@ class AdaptedReactiveMessageSenderTest {
int encodedEntryId =
Integer.parseInt(typedMessageBuilder.getMessage().getValue());
messageSender.complete(
DefaultImplementation.getDefaultImplementation().newMessageId(1,
encodedEntryId, 1));
- }, 5, TimeUnit.MILLISECONDS);
+ }, 100, TimeUnit.MILLISECONDS);
return messageSender;
});
return typedMessageBuilder;
@@ -378,10 +384,11 @@ class AdaptedReactiveMessageSenderTest {
.willReturn(CompletableFuture.completedFuture(producer));
ReactiveMessageSender<String> reactiveSender =
AdaptedReactivePulsarClientFactory.create(pulsarClient)
-
.messageSender(Schema.STRING).maxInflight(7).cache(AdaptedReactivePulsarClientFactory.createCache())
-
.maxConcurrentSenderSubscriptions(1024).topic("my-topic").build();
+
.messageSender(Schema.STRING).maxInflight(maxInflight)
+
.cache(AdaptedReactivePulsarClientFactory.createCache()).maxConcurrentSenderSubscriptions(1024)
+ .topic("my-topic").build();
- List<Integer> inputValues = IntStream.rangeClosed(1,
1000).boxed().collect(Collectors.toList());
+ List<Integer> inputValues = IntStream.rangeClosed(1,
maxElements).boxed().collect(Collectors.toList());
Flux<Integer> inputFlux =
Flux.fromIterable(inputValues);
Flux<MessageId> outputFlux =
sendingFunction.apply(reactiveSender, inputFlux);
@@ -390,7 +397,7 @@ class AdaptedReactiveMessageSenderTest {
List<Integer> outputValues = outputFlux.map((m) ->
(int) ((MessageIdImpl) m).getEntryId()).collectList()
.block();
assertThat(outputValues).containsExactlyInAnyOrderElementsOf(inputValues);
- assertThat(requestsMax.get()).isEqualTo(7);
+ assertThat(requestsMax.get()).isEqualTo(maxInflight);
}
finally {
if (executorService != null) {