ctubbsii commented on code in PR #2609:
URL: https://github.com/apache/accumulo/pull/2609#discussion_r848568550


##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -686,15 +686,29 @@ private void setUpdateTablet(UpdateSession us, KeyExtent 
keyExtent) {
 
   @Override
   public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
-      List<TMutation> tmutations) {
+      List<TMutation> tmutations) throws TException {
     UpdateSession us = (UpdateSession) 
server.sessionManager.reserveSession(updateID);
     if (us == null) {
       return;
     }
 
+    Optional<Semaphore> writeThreadSemaphore = Optional.empty();
     boolean reserved = true;
+
     try {
       KeyExtent keyExtent = KeyExtent.fromThrift(tkeyExtent);
+
+      if (TabletType.type(keyExtent) == TabletType.USER) {
+        writeThreadSemaphore = server.getSemaphore();
+        // if write thread max is configured, get the Semaphore, otherwise do 
nothing
+        if (writeThreadSemaphore.isPresent()) {
+          Semaphore sem = writeThreadSemaphore.get();
+          if (sem.tryAcquire()) {
+            log.trace("Available permits: {}", sem.availablePermits());
+          } else
+            throw new TException("Mutation failed. No threads available.");

Review Comment:
   Do we really want to fail if no threads are available? Wouldn't we just want 
to block? If this fails, what happens to the client? Does this exception reach 
the client? Could that cause a lot of unnecessary RPC IO while the client 
retries?



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +402,23 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Optional<Semaphore> getSemaphore() {

Review Comment:
   This method could be named better to indicate what the Semaphore is for. It 
could also have a javadoc that would explain when the Optional might be empty.



##########
test/src/main/java/org/apache/accumulo/test/functional/WriteThreadsIT.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.TSERV_WRITE_THREADS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteThreadsIT extends AccumuloClusterHarness {
+
+  ThreadPoolExecutor tpe;
+  private static final Logger log = 
LoggerFactory.getLogger(WriteThreadsIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    // sets the thread limit on the SERVER SIDE
+    // default value is 0. when set to 0, there is no limit
+    cfg.setProperty(TSERV_WRITE_THREADS_MAX.getKey(), "10");
+  }
+
+  @Test
+  public void test() throws Exception {
+    write();
+  }
+
+  public void write() throws Exception {
+    // each thread creates a batch writer, adds mutations, and then flushes.
+    final int threadCount = 100;
+
+    // Reads and writes from Accumulo
+    BatchWriterConfig config = new BatchWriterConfig();
+    config.setMaxWriteThreads(1_000); // this is the max write threads on the 
CLIENT SIDE
+
+    try (AccumuloClient client =
+        
Accumulo.newClient().from(getClientProps()).batchWriterConfig(config).build()) {
+
+      tpe = new ThreadPoolExecutor(threadCount, threadCount, 0, 
TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(threadCount));
+
+      final String[] tables = getUniqueNames(threadCount);
+
+      for (int i = 0; i < threadCount; i++) {
+        if (i % 10 == 0)
+          log.debug("iteration: {}", i);
+
+        final String tableName = tables[i];
+        client.tableOperations().create(tableName);
+
+        Runnable r = () -> {
+          try (BatchWriter writer = client.createBatchWriter(tableName)) {
+
+            // Data is written to mutation objects
+            List<Mutation> mutations = Stream.of("row1", "row2", "row3", 
"row4", "row5")
+                .map(Mutation::new).collect(Collectors.toList());
+
+            for (Mutation mutation : mutations) {
+              
mutation.at().family("myColFam").qualifier("myColQual").put("myValue1");
+              
mutation.at().family("myColFam").qualifier("myColQual").put("myValue2");
+              writer.addMutation(mutation);
+            }
+
+            // Sends buffered mutation to Accumulo immediately (write executed)
+            writer.flush();
+          } catch (Exception e) {
+            log.error("ERROR WRITING MUTATION", e);
+          }
+        };
+        // Runnable above is executed
+        tpe.execute(r);
+      }
+
+      tpe.shutdown();
+      assertTrue(tpe.awaitTermination(90, TimeUnit.SECONDS));
+
+      validateData(client, threadCount, tables);
+    }
+  }
+
+  private void validateData(AccumuloClient client, int threadCount, String[] 
tables)
+      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
+    for (int i = 0; i < threadCount; i++) {
+      try (var batchScanner = client.createBatchScanner(tables[i])) {
+        batchScanner.setRanges(List.of(new Range()));
+
+        List<String> rows = new ArrayList<>();
+        for (var e : batchScanner) {
+          rows.add(e.getKey().getRow().toString());
+        }

Review Comment:
   ```suggestion
           batchScanner.forEach((k, v) -> rows.add(k.getRow().toString()));
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +402,23 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Optional<Semaphore> getSemaphore() {
+    int configuredWriteThreadsMax =
+        
getServerConfig().getConfiguration().getCount(Property.TSERV_WRITE_THREADS_MAX);
+    if (configuredWriteThreadsMax == MAX_WRITE_THREADS_DEFAULT) {
+      return Optional.empty();
+    } else {
+      synchronized (this) {
+        // if the value has changed, create a new semaphore
+        if (maxThreadPermits != configuredWriteThreadsMax) {
+          maxThreadPermits = configuredWriteThreadsMax;
+          writeThreadSemaphore = Optional.of(new Semaphore(maxThreadPermits));

Review Comment:
   When this configuration changes, the old Semaphore could still have write 
threads in use. Replacing it means that the threads could exceed the limit 
(possibly by a lot) during this overlap.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -394,6 +402,23 @@ private static long jitter() {
         * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
   }
 
+  public Optional<Semaphore> getSemaphore() {
+    int configuredWriteThreadsMax =
+        
getServerConfig().getConfiguration().getCount(Property.TSERV_WRITE_THREADS_MAX);
+    if (configuredWriteThreadsMax == MAX_WRITE_THREADS_DEFAULT) {
+      return Optional.empty();

Review Comment:
   I don't understand the purpose of this variable. We don't want to return 
empty if it's equal to some arbitrary constant... only if it's `0`. I don't 
think we need this `MAX_WRITE_THREADS_DEFAULT` variable, or the added utility 
code used to read the default value.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -377,6 +383,8 @@ public void run() {
     } else {
       authKeyWatcher = null;
     }
+    MAX_WRITE_THREADS_DEFAULT =
+        
getServerConfig().getConfiguration().getDefaultCount(Property.TSERV_WRITE_THREADS_MAX);

Review Comment:
   I don't understand setting this. What's the difference between this new 
method you added and 
`DefaultConfiguration.getInstance().getCount(Property.TSERV_WRITE_THREADS_MAX)`?



##########
test/src/main/java/org/apache/accumulo/test/functional/WriteThreadsIT.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.TSERV_WRITE_THREADS_MAX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WriteThreadsIT extends AccumuloClusterHarness {
+
+  ThreadPoolExecutor tpe;
+  private static final Logger log = 
LoggerFactory.getLogger(WriteThreadsIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    // sets the thread limit on the SERVER SIDE
+    // default value is 0. when set to 0, there is no limit
+    cfg.setProperty(TSERV_WRITE_THREADS_MAX.getKey(), "10");
+  }
+
+  @Test
+  public void test() throws Exception {
+    write();
+  }
+
+  public void write() throws Exception {
+    // each thread creates a batch writer, adds mutations, and then flushes.
+    final int threadCount = 100;
+
+    // Reads and writes from Accumulo
+    BatchWriterConfig config = new BatchWriterConfig();
+    config.setMaxWriteThreads(1_000); // this is the max write threads on the 
CLIENT SIDE
+
+    try (AccumuloClient client =
+        
Accumulo.newClient().from(getClientProps()).batchWriterConfig(config).build()) {
+
+      tpe = new ThreadPoolExecutor(threadCount, threadCount, 0, 
TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(threadCount));
+
+      final String[] tables = getUniqueNames(threadCount);
+
+      for (int i = 0; i < threadCount; i++) {
+        if (i % 10 == 0)
+          log.debug("iteration: {}", i);
+
+        final String tableName = tables[i];
+        client.tableOperations().create(tableName);
+
+        Runnable r = () -> {
+          try (BatchWriter writer = client.createBatchWriter(tableName)) {
+
+            // Data is written to mutation objects
+            List<Mutation> mutations = Stream.of("row1", "row2", "row3", 
"row4", "row5")
+                .map(Mutation::new).collect(Collectors.toList());
+
+            for (Mutation mutation : mutations) {
+              
mutation.at().family("myColFam").qualifier("myColQual").put("myValue1");
+              
mutation.at().family("myColFam").qualifier("myColQual").put("myValue2");
+              writer.addMutation(mutation);
+            }
+
+            // Sends buffered mutation to Accumulo immediately (write executed)
+            writer.flush();
+          } catch (Exception e) {
+            log.error("ERROR WRITING MUTATION", e);
+          }
+        };
+        // Runnable above is executed
+        tpe.execute(r);
+      }
+
+      tpe.shutdown();
+      assertTrue(tpe.awaitTermination(90, TimeUnit.SECONDS));
+
+      validateData(client, threadCount, tables);
+    }
+  }
+
+  private void validateData(AccumuloClient client, int threadCount, String[] 
tables)
+      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
+    for (int i = 0; i < threadCount; i++) {
+      try (var batchScanner = client.createBatchScanner(tables[i])) {
+        batchScanner.setRanges(List.of(new Range()));
+
+        List<String> rows = new ArrayList<>();
+        for (var e : batchScanner) {
+          rows.add(e.getKey().getRow().toString());
+        }
+        assertEquals(5, rows.size(), "Wrong number of rows returned.");
+        assertTrue(rows.containsAll(List.of("row1", "row2", "row3", "row4", 
"row5")));

Review Comment:
   I think assertEquals might make more sense than checking the size and then 
contains.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to