rzo1 commented on code in PR #1700:
URL: https://github.com/apache/stormcrawler/pull/1700#discussion_r2446943171
##########
external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/StatusBoltTest.java:
##########
@@ -128,23 +124,27 @@ private Future<Integer> store(String url, Status status,
Metadata metadata) {
}
@Test
- @Timeout(value = 2, unit = TimeUnit.MINUTES)
- // see https://github.com/apache/stormcrawler/issues/885
- void checkListKeyFromOpensearch()
- throws IOException, ExecutionException, InterruptedException,
TimeoutException {
- String url = "https://www.url.net/something";
- Metadata md = new Metadata();
- md.addValue("someKey", "someValue");
- store(url, Status.DISCOVERED, md).get(10, TimeUnit.SECONDS);
- assertEquals(1, output.getAckedTuples().size());
- // check output in Opensearch?
- String id = org.apache.commons.codec.digest.DigestUtils.sha256Hex(url);
- GetResponse result = client.get(new GetRequest("status", id),
RequestOptions.DEFAULT);
- Map<String, Object> sourceAsMap = result.getSourceAsMap();
- final String pfield = "metadata.someKey";
- sourceAsMap = (Map<String, Object>) sourceAsMap.get("metadata");
- final var pfieldNew = pfield.substring(9);
- Object key = sourceAsMap.get(pfieldNew);
- assertTrue(key instanceof java.util.ArrayList);
+ public void testWaitAckCacheSpecAppliedFromConfig() throws Exception {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("opensearch.status.waitack.cache.spec",
"maximumSize=10,expireAfterWrite=1s");
+ conf.put("opensearch.status.routing.fieldname", "metadata.key");
+ conf.put("scheduler.class",
"org.apache.stormcrawler.persistence.DefaultScheduler");
+
+ TopologyContext mockContext = Mockito.mock(TopologyContext.class);
+ OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
+
+ StatusUpdaterBolt bolt = new StatusUpdaterBolt();
+ try {
+ bolt.prepare(conf, mockContext, mockCollector);
+ } catch (RuntimeException e) {
+ // 연결 실패 시 예외 분기 발생 → Jacoco branch coverage 확보
Review Comment:
Can we drop foreign language comments here please?
##########
external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java:
##########
@@ -160,11 +159,13 @@ public void prepare(
fieldNameForRoutingKey = fieldNameForRoutingKey.replaceAll("\\.",
"%2E");
}
- waitAck =
- Caffeine.newBuilder()
- .expireAfterWrite(60, TimeUnit.SECONDS)
- .removalListener(this)
- .build();
+ String waitAckSpec =
+ ConfUtils.getString(
+ stormConf,
+ "opensearch.status.waitack.cache.spec",
+ "maximumSize=10000,expireAfterWrite=60s");
Review Comment:
see comment above, might be inlined in `Caffeine.from(...)` as well.
##########
external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/StatusBoltTest.java:
##########
@@ -16,22 +16,21 @@
*/
package org.apache.stormcrawler.opensearch.bolt;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.Assert.*;
Review Comment:
Please avoid star imports - also for the other occurences.
##########
external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/StatusBoltTest.java:
##########
@@ -128,23 +124,27 @@ private Future<Integer> store(String url, Status status,
Metadata metadata) {
}
@Test
- @Timeout(value = 2, unit = TimeUnit.MINUTES)
- // see https://github.com/apache/stormcrawler/issues/885
- void checkListKeyFromOpensearch()
- throws IOException, ExecutionException, InterruptedException,
TimeoutException {
- String url = "https://www.url.net/something";
- Metadata md = new Metadata();
- md.addValue("someKey", "someValue");
- store(url, Status.DISCOVERED, md).get(10, TimeUnit.SECONDS);
- assertEquals(1, output.getAckedTuples().size());
- // check output in Opensearch?
- String id = org.apache.commons.codec.digest.DigestUtils.sha256Hex(url);
- GetResponse result = client.get(new GetRequest("status", id),
RequestOptions.DEFAULT);
- Map<String, Object> sourceAsMap = result.getSourceAsMap();
- final String pfield = "metadata.someKey";
- sourceAsMap = (Map<String, Object>) sourceAsMap.get("metadata");
- final var pfieldNew = pfield.substring(9);
- Object key = sourceAsMap.get(pfieldNew);
- assertTrue(key instanceof java.util.ArrayList);
+ public void testWaitAckCacheSpecAppliedFromConfig() throws Exception {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("opensearch.status.waitack.cache.spec",
"maximumSize=10,expireAfterWrite=1s");
+ conf.put("opensearch.status.routing.fieldname", "metadata.key");
+ conf.put("scheduler.class",
"org.apache.stormcrawler.persistence.DefaultScheduler");
+
+ TopologyContext mockContext = Mockito.mock(TopologyContext.class);
+ OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
+
+ StatusUpdaterBolt bolt = new StatusUpdaterBolt();
+ try {
+ bolt.prepare(conf, mockContext, mockCollector);
+ } catch (RuntimeException e) {
+ // 연결 실패 시 예외 분기 발생 → Jacoco branch coverage 확보
+ assertTrue(e.getMessage().contains("Can't connect"));
+ }
+
+ Field field = StatusUpdaterBolt.class.getDeclaredField("waitAck");
Review Comment:
What are we trying to achieve here? Why do we check for the classname? If
the class can't be initalized, the bolt is merely useless (so we should fail
early), so no need for such a test imho.
##########
external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/bolt/StatusBoltTest.java:
##########
@@ -128,23 +124,27 @@ private Future<Integer> store(String url, Status status,
Metadata metadata) {
}
@Test
- @Timeout(value = 2, unit = TimeUnit.MINUTES)
Review Comment:
Why is this test removed? If it fails, we should adjust it (or provide a
reasoning why it was dropped)
##########
external/opensearch/opensearch-conf.yaml:
##########
@@ -114,6 +114,8 @@ config:
opensearch.status.recentDate.increase: -1
opensearch.status.recentDate.min.gap: -1
+ # Caffeine cache specification for the waitAck cache used in
StatusUpdaterBolt.
+ opensearch.status.waitack.cache.spec:
"maximumSize=10000,expireAfterWrite=60s"
Review Comment:
Can we set the value to `topology.message.timeout.secs` (default: 300) here
and leave a comment, i.e.
```
# Set expireAfterWrite to match topology.message.timeout.secs (default: 300s)
# so entries persist as long as messages may remain unacked.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]