gerlowskija commented on code in PR #61: URL: https://github.com/apache/solr-sandbox/pull/61#discussion_r1223044484
########## crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java: ########## @@ -127,58 +125,82 @@ private UpdateRequest createAndOrGetMirrorRequest() { log.debug("processAdd isLeader={} cmd={}", isLeader, cmd); } - @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException { - if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) { + @Override + public void processDelete(final DeleteUpdateCommand cmd) throws IOException { + String dbqMethod = cmd.getReq().getParams().get("dbqMethod"); + if (dbqMethod == null) { + dbqMethod = defaultDefaultDBQMethod; + } - CloudDescriptor cloudDesc = - cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor(); + if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) { + CloudDescriptor cloudDesc = cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor(); String collection = cloudDesc.getCollectionName(); - HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient(); - try (HttpSolrClient client = - new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) { - + try (HttpSolrClient client = new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) { String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName(); - int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000); - SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField); - String cursorMark = CursorMarkParams.CURSOR_MARK_START; - - int cnt = 1; - boolean done = false; - while (!done) { - q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); - QueryResponse rsp = - client.query(collection, q); - String nextCursorMark = rsp.getNextCursorMark(); - - if (log.isDebugEnabled()) { - log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt, - rsp.getResults()); - cnt++; + if (dbqMethod == null || dbqMethod.equals("default")) { + + int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000); + SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField); + String cursorMark = CursorMarkParams.CURSOR_MARK_START; + + int cnt = 1; + boolean done = false; + while (!done) { + q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); + QueryResponse rsp = + client.query(collection, q); + String nextCursorMark = rsp.getNextCursorMark(); + + if (log.isDebugEnabled()) { + log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt, + rsp.getResults()); + cnt++; + } + + processDBQResults(client, collection, uniqueField, rsp); + if (cursorMark.equals(nextCursorMark)) { + done = true; + } + cursorMark = nextCursorMark; } - - processDBQResults(client, collection, uniqueField, rsp); - if (cursorMark.equals(nextCursorMark)) { - done = true; + } else if (dbqMethod.equals("convert_no_paging")) { Review Comment: [0] Hardly matters, but might be nice to have these constants be an enum so the set of acceptable values is a little more explicit. ########## crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java: ########## @@ -127,58 +125,82 @@ private UpdateRequest createAndOrGetMirrorRequest() { log.debug("processAdd isLeader={} cmd={}", isLeader, cmd); } - @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException { - if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) { + @Override + public void processDelete(final DeleteUpdateCommand cmd) throws IOException { + String dbqMethod = cmd.getReq().getParams().get("dbqMethod"); + if (dbqMethod == null) { + dbqMethod = defaultDefaultDBQMethod; + } - CloudDescriptor cloudDesc = - cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor(); + if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) { + CloudDescriptor cloudDesc = cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor(); String collection = cloudDesc.getCollectionName(); - HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient(); - try (HttpSolrClient client = - new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) { - + try (HttpSolrClient client = new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) { String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName(); - int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000); - SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField); - String cursorMark = CursorMarkParams.CURSOR_MARK_START; - - int cnt = 1; - boolean done = false; - while (!done) { - q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); - QueryResponse rsp = - client.query(collection, q); - String nextCursorMark = rsp.getNextCursorMark(); - - if (log.isDebugEnabled()) { - log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt, - rsp.getResults()); - cnt++; + if (dbqMethod == null || dbqMethod.equals("default")) { + + int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000); + SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField); + String cursorMark = CursorMarkParams.CURSOR_MARK_START; + + int cnt = 1; + boolean done = false; + while (!done) { + q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); + QueryResponse rsp = + client.query(collection, q); + String nextCursorMark = rsp.getNextCursorMark(); + + if (log.isDebugEnabled()) { + log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt, + rsp.getResults()); + cnt++; + } + + processDBQResults(client, collection, uniqueField, rsp); + if (cursorMark.equals(nextCursorMark)) { + done = true; + } + cursorMark = nextCursorMark; } - - processDBQResults(client, collection, uniqueField, rsp); - if (cursorMark.equals(nextCursorMark)) { - done = true; + } else if (dbqMethod.equals("convert_no_paging")) { + int rows = 10000; + SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setFields(uniqueField); Review Comment: [Q] I wonder if we could use a higher rows value if we fetched the documents using the /export handler, rather than /select? Or even better: a `delete(..., search(...))` streaming expression would use the /export handler implicitly, and batch and send the delete-by-ID requests for us. It looks like that `delete` streaming expression was built with this specific use case in mind. (see [here](https://issues.apache.org/jira/browse/SOLR-14241) and [here](https://solr.apache.org/guide/8_7/stream-decorator-reference.html#delete) ) ########## crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryIntegrationTest.java: ########## @@ -0,0 +1,428 @@ +package org.apache.solr.crossdc; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.lucene.util.QuickPatchThreadsFilter; +import org.apache.solr.SolrIgnoredThreadsFilter; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.MiniSolrCloudCluster; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ObjectReleaseTracker; +import org.apache.solr.crossdc.common.KafkaCrossDcConf; +import org.apache.solr.crossdc.consumer.Consumer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE; + +@ThreadLeakFilters(defaultFilters = true, filters = {SolrIgnoredThreadsFilter.class, + QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class}) +@ThreadLeakLingering(linger = 5000) +public class DeleteByQueryIntegrationTest extends + SolrTestCaseJ4 { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int MAX_MIRROR_BATCH_SIZE_BYTES = Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE); + private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES; + + static final String VERSION_FIELD = "_version_"; + + private static final int NUM_BROKERS = 1; + public static EmbeddedKafkaCluster kafkaCluster; + + protected static volatile MiniSolrCloudCluster solrCluster1; + protected static volatile MiniSolrCloudCluster solrCluster2; + + protected static volatile Consumer consumer = new Consumer(); + + private static String TOPIC = "topic1"; + + private static String COLLECTION = "collection1"; + private static String ALT_COLLECTION = "collection2"; + + @BeforeClass + public static void beforeSolrAndKafkaIntegrationTest() throws Exception { Review Comment: [Q] Is there a reason this test class doesn't extend `SolrAndKafkaIntegrationTest`? Maybe it differs in subtle ways, but from a skim it seems like we're duplicating a lot of logic from that base class here? ########## crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java: ########## @@ -249,8 +249,10 @@ public UpdateRequestProcessor getInstance(final SolrQueryRequest req, final Solr log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams); } + String defaultDefaultDBQMethod = conf.get(KafkaCrossDcConf.DEFAULT_DBQ_METHOD); Review Comment: [0] Dumb question, but should all these variable names really start with "defaultDefault" instead of just "default"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org