This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 3725e23  Test sender maxInflight with multiple values (#93)
3725e23 is described below

commit 3725e23f2e4598e99284ff827bf5bfea1973e40f
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Dec 7 16:55:04 2022 +0100

    Test sender maxInflight with multiple values (#93)
---
 .../adapter/AdaptedReactiveMessageSenderTest.java  | 35 +++++++++++++---------
 1 file changed, 21 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index 43f73dd..163d166 100644
--- 
a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++ 
b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -58,6 +58,7 @@ import 
org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
@@ -328,20 +329,25 @@ class AdaptedReactiveMessageSenderTest {
                assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4), 
Duration.ofSeconds(5));
        }
 
-       @Test
-       void maxInFlightUsingSendOne() throws Exception {
-               doTestMaxInFlight((reactiveSender, inputFlux) -> inputFlux
-                               .flatMap((i) -> 
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100));
+       @ParameterizedTest
+       @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+       void maxInFlightUsingSendOne(int maxInflight, int maxElements) throws 
Exception {
+               doTestMaxInFlight(
+                               (reactiveSender, inputFlux) -> inputFlux
+                                               .flatMap((i) -> 
reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100),
+                               maxInflight, maxElements);
        }
 
-       @Test
-       void maxInFlightUsingSendMany() throws Exception {
+       @ParameterizedTest
+       @CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
+       void maxInFlightUsingSendMany(int maxInflight, int maxElements) throws 
Exception {
                doTestMaxInFlight((reactiveSender, inputFlux) -> 
inputFlux.window(3).flatMap(
-                               (subFlux) -> subFlux.map((i) -> 
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100));
+                               (subFlux) -> subFlux.map((i) -> 
MessageSpec.of(String.valueOf(i))).as(reactiveSender::sendMany), 100),
+                               maxInflight, maxElements);
        }
 
-       void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, 
Flux<Integer>, Flux<MessageId>> sendingFunction)
-                       throws Exception {
+       void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, 
Flux<Integer>, Flux<MessageId>> sendingFunction,
+                       int maxInflight, int maxElements) throws Exception {
                ScheduledExecutorService executorService = null;
                try {
                        executorService = 
Executors.newSingleThreadScheduledExecutor();
@@ -368,7 +374,7 @@ class AdaptedReactiveMessageSenderTest {
                                                int encodedEntryId = 
Integer.parseInt(typedMessageBuilder.getMessage().getValue());
                                                messageSender.complete(
                                                                
DefaultImplementation.getDefaultImplementation().newMessageId(1, 
encodedEntryId, 1));
-                                       }, 5, TimeUnit.MILLISECONDS);
+                                       }, 100, TimeUnit.MILLISECONDS);
                                        return messageSender;
                                });
                                return typedMessageBuilder;
@@ -378,10 +384,11 @@ class AdaptedReactiveMessageSenderTest {
                                        
.willReturn(CompletableFuture.completedFuture(producer));
 
                        ReactiveMessageSender<String> reactiveSender = 
AdaptedReactivePulsarClientFactory.create(pulsarClient)
-                                       
.messageSender(Schema.STRING).maxInflight(7).cache(AdaptedReactivePulsarClientFactory.createCache())
-                                       
.maxConcurrentSenderSubscriptions(1024).topic("my-topic").build();
+                                       
.messageSender(Schema.STRING).maxInflight(maxInflight)
+                                       
.cache(AdaptedReactivePulsarClientFactory.createCache()).maxConcurrentSenderSubscriptions(1024)
+                                       .topic("my-topic").build();
 
-                       List<Integer> inputValues = IntStream.rangeClosed(1, 
1000).boxed().collect(Collectors.toList());
+                       List<Integer> inputValues = IntStream.rangeClosed(1, 
maxElements).boxed().collect(Collectors.toList());
 
                        Flux<Integer> inputFlux = 
Flux.fromIterable(inputValues);
                        Flux<MessageId> outputFlux = 
sendingFunction.apply(reactiveSender, inputFlux);
@@ -390,7 +397,7 @@ class AdaptedReactiveMessageSenderTest {
                        List<Integer> outputValues = outputFlux.map((m) -> 
(int) ((MessageIdImpl) m).getEntryId()).collectList()
                                        .block();
                        
assertThat(outputValues).containsExactlyInAnyOrderElementsOf(inputValues);
-                       assertThat(requestsMax.get()).isEqualTo(7);
+                       assertThat(requestsMax.get()).isEqualTo(maxInflight);
                }
                finally {
                        if (executorService != null) {

Reply via email to