gerlowskija commented on code in PR #61:
URL: https://github.com/apache/solr-sandbox/pull/61#discussion_r1223044484


##########
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java:
##########
@@ -127,58 +125,82 @@ private UpdateRequest createAndOrGetMirrorRequest() {
       log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
   }
 
-  @Override public void processDelete(final DeleteUpdateCommand cmd) throws 
IOException {
-    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+  @Override
+  public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+    String dbqMethod = cmd.getReq().getParams().get("dbqMethod");
+    if (dbqMethod == null) {
+      dbqMethod = defaultDefaultDBQMethod;
+    }
 
-      CloudDescriptor cloudDesc =
-          cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+      CloudDescriptor cloudDesc = 
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
       String collection = cloudDesc.getCollectionName();
-
       HttpClient httpClient = 
cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
 
-      try (HttpSolrClient client =
-          new 
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
 {
-
+      try (HttpSolrClient client = new 
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
 {
         String uniqueField = 
cmd.getReq().getSchema().getUniqueKeyField().getName();
 
-        int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
-        SolrQuery q = new 
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
-        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
-
-        int cnt = 1;
-        boolean done = false;
-        while (!done) {
-          q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
-          QueryResponse rsp =
-              client.query(collection, q);
-          String nextCursorMark = rsp.getNextCursorMark();
-
-          if (log.isDebugEnabled()) {
-            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, 
nextCursorMark, cnt,
-                rsp.getResults());
-            cnt++;
+        if (dbqMethod == null || dbqMethod.equals("default")) {
+
+          int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+          SolrQuery q = new 
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+          String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+          int cnt = 1;
+          boolean done = false;
+          while (!done) {
+            q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+            QueryResponse rsp =
+                    client.query(collection, q);
+            String nextCursorMark = rsp.getNextCursorMark();
+
+            if (log.isDebugEnabled()) {
+              log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", 
cursorMark, nextCursorMark, cnt,
+                      rsp.getResults());
+              cnt++;
+            }
+
+            processDBQResults(client, collection, uniqueField, rsp);
+            if (cursorMark.equals(nextCursorMark)) {
+              done = true;
+            }
+            cursorMark = nextCursorMark;
           }
-
-          processDBQResults(client, collection, uniqueField, rsp);
-          if (cursorMark.equals(nextCursorMark)) {
-            done = true;
+        } else if (dbqMethod.equals("convert_no_paging")) {

Review Comment:
   [0] Hardly matters, but might be nice to have these constants be an enum so 
the set of acceptable values is a little more explicit.



##########
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java:
##########
@@ -127,58 +125,82 @@ private UpdateRequest createAndOrGetMirrorRequest() {
       log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
   }
 
-  @Override public void processDelete(final DeleteUpdateCommand cmd) throws 
IOException {
-    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+  @Override
+  public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+    String dbqMethod = cmd.getReq().getParams().get("dbqMethod");
+    if (dbqMethod == null) {
+      dbqMethod = defaultDefaultDBQMethod;
+    }
 
-      CloudDescriptor cloudDesc =
-          cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+      CloudDescriptor cloudDesc = 
cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
       String collection = cloudDesc.getCollectionName();
-
       HttpClient httpClient = 
cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
 
-      try (HttpSolrClient client =
-          new 
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
 {
-
+      try (HttpSolrClient client = new 
HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build())
 {
         String uniqueField = 
cmd.getReq().getSchema().getUniqueKeyField().getName();
 
-        int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
-        SolrQuery q = new 
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
-        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
-
-        int cnt = 1;
-        boolean done = false;
-        while (!done) {
-          q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
-          QueryResponse rsp =
-              client.query(collection, q);
-          String nextCursorMark = rsp.getNextCursorMark();
-
-          if (log.isDebugEnabled()) {
-            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, 
nextCursorMark, cnt,
-                rsp.getResults());
-            cnt++;
+        if (dbqMethod == null || dbqMethod.equals("default")) {
+
+          int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+          SolrQuery q = new 
SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+          String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+          int cnt = 1;
+          boolean done = false;
+          while (!done) {
+            q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+            QueryResponse rsp =
+                    client.query(collection, q);
+            String nextCursorMark = rsp.getNextCursorMark();
+
+            if (log.isDebugEnabled()) {
+              log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", 
cursorMark, nextCursorMark, cnt,
+                      rsp.getResults());
+              cnt++;
+            }
+
+            processDBQResults(client, collection, uniqueField, rsp);
+            if (cursorMark.equals(nextCursorMark)) {
+              done = true;
+            }
+            cursorMark = nextCursorMark;
           }
-
-          processDBQResults(client, collection, uniqueField, rsp);
-          if (cursorMark.equals(nextCursorMark)) {
-            done = true;
+        } else if (dbqMethod.equals("convert_no_paging")) {
+          int rows = 10000;
+          SolrQuery q = new 
SolrQuery(cmd.query).setRows(rows).setFields(uniqueField);

Review Comment:
   [Q] I wonder if we could use a higher rows value if we fetched the documents 
using the /export handler, rather than /select?
   
   Or even better: a `delete(..., search(...))` streaming expression would use 
the /export handler implicitly, and batch and send the delete-by-ID requests 
for us.  It looks like that `delete` streaming expression was built with this 
specific use case in mind. (see 
[here](https://issues.apache.org/jira/browse/SOLR-14241) and 
[here](https://solr.apache.org/guide/8_7/stream-decorator-reference.html#delete)
 )



##########
crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryIntegrationTest.java:
##########
@@ -0,0 +1,428 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+
+@ThreadLeakFilters(defaultFilters = true, filters = 
{SolrIgnoredThreadsFilter.class,
+        QuickPatchThreadsFilter.class, 
SolrKafkaTestsIgnoredThreadsFilter.class})
+@ThreadLeakLingering(linger = 5000)
+public class DeleteByQueryIntegrationTest extends
+        SolrTestCaseJ4 {
+
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private static final int MAX_MIRROR_BATCH_SIZE_BYTES = 
Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
+    private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES;
+
+    static final String VERSION_FIELD = "_version_";
+
+    private static final int NUM_BROKERS = 1;
+    public static EmbeddedKafkaCluster kafkaCluster;
+
+    protected static volatile MiniSolrCloudCluster solrCluster1;
+    protected static volatile MiniSolrCloudCluster solrCluster2;
+
+    protected static volatile Consumer consumer = new Consumer();
+
+    private static String TOPIC = "topic1";
+
+    private static String COLLECTION = "collection1";
+    private static String ALT_COLLECTION = "collection2";
+
+    @BeforeClass
+    public static void beforeSolrAndKafkaIntegrationTest() throws Exception {

Review Comment:
   [Q] Is there a reason this test class doesn't extend 
`SolrAndKafkaIntegrationTest`?  Maybe it differs in subtle ways, but from a 
skim it seems like we're duplicating a lot of logic from that base class here?



##########
crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java:
##########
@@ -249,8 +249,10 @@ public UpdateRequestProcessor getInstance(final 
SolrQueryRequest req, final Solr
             log.trace("Create MirroringUpdateProcessor with 
mirroredParams={}", mirroredParams);
         }
 
+        String defaultDefaultDBQMethod = 
conf.get(KafkaCrossDcConf.DEFAULT_DBQ_METHOD);

Review Comment:
   [0] Dumb question, but should all these variable names really start with 
"defaultDefault" instead of just "default"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to