JiriOndrusek commented on code in PR #5184:
URL: https://github.com/apache/camel-quarkus/pull/5184#discussion_r1294304159


##########
integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java:
##########
@@ -16,138 +16,153 @@
  */
 package org.apache.camel.quarkus.component.splunk.it;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
-import io.restassured.common.mapper.TypeRef;
 import io.restassured.http.ContentType;
+import org.apache.camel.component.splunk.ProducerType;
 import org.apache.camel.util.CollectionHelper;
 import org.eclipse.microprofile.config.ConfigProvider;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
 
 @QuarkusTest
 @QuarkusTestResource(SplunkTestResource.class)
 class SplunkTest {
 
     @Test
-    public void testWriteTcpAndReadNormal() {
-        write("_normal", SplunkTestResource.TEST_INDEX, "tcp");
-
-        List<Map<String, String>> result = RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .body(String.format(
-                        "search index=%s sourcetype=%s | rex field=_raw 
\"Name: (?<name>.*) From: (?<from>.*)\"",
-                        SplunkTestResource.TEST_INDEX, 
SplunkResource.SOURCE_TYPE))
-                .post("/splunk/normal")
-                .then()
-                .statusCode(200)
-                .extract().as(new TypeRef<>() {
-                });
+    public void testNormalSearchWithSubmitWithRawData() {
+        String suffix = "_normalSearchOfSubmit";
 
-        Assertions.assertEquals(3, result.size());
-        Assertions.assertEquals("Irma_normal", result.get(0).get("name"));
-        Assertions.assertEquals("Earth\"", result.get(0).get("from"));
-        Assertions.assertEquals("Leonard_normal", result.get(1).get("name"));
-        Assertions.assertEquals("Earth 2.0\"", result.get(1).get("from"));
-        Assertions.assertEquals("Sheldon_normal", result.get(2).get("name"));
-        Assertions.assertEquals("Alpha Centauri\"", result.get(2).get("from"));
-    }
+        write(suffix, ProducerType.SUBMIT, 0, true);
 
-    @Test
-    public void testWriteSubmitAndReadRealtime() throws InterruptedException, 
ExecutionException {
+        Awaitility.await().pollInterval(1000, 
TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+                () -> {
 
-        RestAssured.given()
-                .body(String.format(
-                        "search index=%s sourcetype=%s | rex field=_raw 
\"Name: (?<name>.*) From: (?<from>.*)\"",
-                        SplunkTestResource.TEST_INDEX, 
SplunkResource.SOURCE_TYPE))
-                .post("/splunk/startRealtimePolling");
-
-        //wait some time to start polling
-        TimeUnit.SECONDS.sleep(3);
-        write("_realtime1", SplunkTestResource.TEST_INDEX, "submit");
-        TimeUnit.SECONDS.sleep(1);
-        write("_realtime2", SplunkTestResource.TEST_INDEX, "submit");
-        TimeUnit.SECONDS.sleep(1);
-        write("_realtime3", SplunkTestResource.TEST_INDEX, "submit");
-        //wait some time to gather the pulls from splunk server
-        TimeUnit.SECONDS.sleep(3);
-        //there should be some data from realtime search in direct (concrete 
values depends on the speed of writing into index)
-        //test is asserting that there are some
-        RestAssured.get("/splunk/directRealtimePolling")
-                .then()
-                .statusCode(200)
-                .body(containsString("_realtime"));
+                    String result = RestAssured.given()
+                            .contentType(ContentType.TEXT)
+                            .post("/splunk/results/normalSearch")
+                            .then()
+                            .statusCode(200)
+                            .extract().asString();
+
+                    return result.contains("Name: Sheldon" + suffix)
+                            && result.contains("Name: Leonard" + suffix)
+                            && result.contains("Name: Irma" + suffix);
+                });
     }
 
     @Test
-    public void testWriteStreamAndReadSaved() throws InterruptedException {
-        int defaultPort = RestAssured.port;
-        String defaultUri = RestAssured.baseURI;
-
+    public void testSavedSearchWithTcp() throws InterruptedException {
+        String suffix = "_SavedSearchOfTcp";
         //create saved search
         RestAssured.given()
                 .baseUri("http://localhost";)
                 
.port(ConfigProvider.getConfig().getValue(SplunkResource.PARAM_REMOTE_PORT, 
Integer.class))
                 .contentType(ContentType.JSON)
-                .param("name", SplunkTestResource.SAVED_SEARCH_NAME)
+                .param("name", SplunkResource.SAVED_SEARCH_NAME)
                 .param("disabled", "0")
-                .param("description", "descritionText")
+                .param("description", "descriptionText")
                 .param("search",
-                        "index=" + SplunkTestResource.TEST_INDEX + " 
sourcetype=" + SplunkResource.SOURCE_TYPE)
+                        "sourcetype=\"TCP\" | rex field=_raw \"Name: 
(?<name>.*) From: (?<from>.*)\"")
                 .post("/services/saved/searches")
                 .then()
                 .statusCode(anyOf(is(201), is(409)));
-        write("_s", SplunkTestResource.TEST_INDEX, "stream");
 
-        RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .body(SplunkTestResource.SAVED_SEARCH_NAME)
-                .post("/splunk/savedSearch")
-                .then()
-                .statusCode(200)
-                .body(containsString("Name: Sheldon_s"))
-                .body(containsString("Name: Leonard_s"))
-                .body(containsString("Name: Irma_s"));
-    }
-
-    private void write(String suffix, String index, String endpoint) {
-        write(CollectionHelper.mapOf("entity", "Name: Sheldon" + suffix + " 
From: Alpha Centauri"), "submit",
-                index);
-        write(CollectionHelper.mapOf("entity", "Name: Leonard" + suffix + " 
From: Earth 2.0"), "submit",
-                index);
-        write(CollectionHelper.mapOf("entity", "Name: Irma" + suffix + " From: 
Earth"), "submit", index);
+        //write data via tcp
+        write(suffix, ProducerType.TCP, 0, false);
+
+        //there might by delay in receiving the data
+        Awaitility.await().pollInterval(1000, 
TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+                () -> {
+                    String result = RestAssured.given()
+                            .contentType(ContentType.TEXT)
+                            .post("/splunk/results/savedSearch")
+                            .then()
+                            .statusCode(200)
+                            .extract().asString();
+
+                    return result.contains("Name: Sheldon" + suffix)
+                            && result.contains("Name: Leonard" + suffix)
+                            && result.contains("Name: Irma" + suffix);
+                });
     }
 
-    private void write(Map<String, String> data, String endpoint, String 
index) {
+    @Test
+    public void testStreamForRealtime() throws InterruptedException, 
ExecutionException {
+        String suffix = "_RealtimeSearchOfStream";
+        //there is a buffer for stream writing, therefore about 1MB of data 
has to be written into Splunk
+
+        //data are written in separated thread
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        //execute component server to wait for the result
+        Future futureResult = executor.submit(
+                () -> {
+                    for (int i = 0; i < 5000; i++) {
+                        write(suffix + i, ProducerType.STREAM, 100, false);
+                    }
+                });
 
-        String expectedResult = expectedResult(data);
+        try {
+            Awaitility.await().pollInterval(1000, 
TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(
+                    () -> {
+
+                        String result = RestAssured.given()
+                                .contentType(ContentType.TEXT)
+                                .post("/splunk/results/realtimeSearch")
+                                .then()
+                                .statusCode(200)
+                                .extract().asString();
+
+                        return result.contains("Name: Sheldon" + suffix)
+                                && result.contains("Name: Leonard" + suffix)
+                                && result.contains("Name: Irma" + suffix);
+                    });
+        } finally {
+            futureResult.cancel(true);
+        }
+    }
 
-        RestAssured.given()
+    private void write(String suffix, ProducerType producerType, int 
lengthOfRandomString, boolean raw) {
+        Consumer<Map> write = data -> RestAssured.given()
                 .contentType(ContentType.JSON)
-                .queryParam("index", index)
+                .queryParam("index", SplunkTestResource.TEST_INDEX)
                 .body(data)
-                .post("/splunk/" + endpoint)
+                .post("/splunk/write/" + producerType.name())
                 .then()
-                .statusCode(201)
-                .body(containsString(expectedResult));
+                .statusCode(201);
+
+        Map data[] = new Map[3];

Review Comment:
   TBH I just thought, that it is not necessary to verify results of the 
insert, as the status code 201 means, that insert was done (the data is then 
verified in the tests, but I can add it back)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to