This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 4c23acbb54 Conditional writer creation now always includes the client properties (#4732) 4c23acbb54 is described below commit 4c23acbb54b4cffffbadc7a57e079a9476b82276 Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Wed Jul 10 15:04:31 2024 -0400 Conditional writer creation now always includes the client properties (#4732) Closes #4708 - Fixed ClientContext.createConditionalWriter(String) to include client props - Added testCreateConditionalWriterUsesClientProps to ConditionalWriterIT --- .../accumulo/core/clientImpl/ClientContext.java | 4 +-- .../core/clientImpl/ConditionalWriterImpl.java | 11 +++++++- .../apache/accumulo/test/ConditionalWriterIT.java | 33 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index e4ba0028c9..75b63940ce 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -786,9 +786,7 @@ public class ClientContext implements AccumuloClient { @Override public ConditionalWriter createConditionalWriter(String tableName) throws TableNotFoundException { - ensureOpen(); - return new ConditionalWriterImpl(this, requireNotOffline(getTableId(tableName), tableName), - tableName, new ConditionalWriterConfig()); + return createConditionalWriter(tableName, null); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 3beedd49bf..cb7675196c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -90,7 +90,9 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class ConditionalWriterImpl implements ConditionalWriter { +import com.google.common.annotations.VisibleForTesting; + +public class ConditionalWriterImpl implements ConditionalWriter { private static final Logger log = LoggerFactory.getLogger(ConditionalWriterImpl.class); @@ -106,6 +108,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private long timeout; private final Durability durability; private final String classLoaderContext; + private final ConditionalWriterConfig config; private static class ServerQueue { BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<>(); @@ -371,6 +374,7 @@ class ConditionalWriterImpl implements ConditionalWriter { ConditionalWriterImpl(ClientContext context, TableId tableId, String tableName, ConditionalWriterConfig config) { + this.config = config; this.context = context; this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); @@ -825,6 +829,11 @@ class ConditionalWriterImpl implements ConditionalWriter { } } + @VisibleForTesting + public ConditionalWriterConfig getConfig() { + return config; + } + @Override public Result write(ConditionalMutation mutation) { return write(Collections.singleton(mutation).iterator()).next(); diff --git a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java index 4e1ad83086..853269c390 100644 --- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -58,6 +58,7 @@ import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Result; import org.apache.accumulo.core.client.ConditionalWriter.Status; import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.RowIterator; @@ -68,6 +69,8 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.clientImpl.ConditionalWriterImpl; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Condition; @@ -1536,4 +1539,34 @@ public class ConditionalWriterIT extends SharedMiniClusterBase { } } + @Test + public void testCreateConditionalWriterUsesClientProps() throws Exception { + // Tests that creating a conditional writer includes the client properties that were set + String tableName = getUniqueNames(1)[0]; + var clientProps = getClientProps(); + // Set non-default values for all conditional writer props + clientProps.setProperty(ClientProperty.CONDITIONAL_WRITER_TIMEOUT_MAX.getKey(), "99"); + clientProps.setProperty(ClientProperty.CONDITIONAL_WRITER_THREADS_MAX.getKey(), "101"); + clientProps.setProperty(ClientProperty.CONDITIONAL_WRITER_DURABILITY.getKey(), + Durability.NONE.name()); + try (AccumuloClient client = Accumulo.newClient().from(clientProps).build()) { + client.tableOperations().create(tableName); + + try ( + ConditionalWriterImpl cw1 = + (ConditionalWriterImpl) client.createConditionalWriter(tableName); + ConditionalWriterImpl cw2 = + (ConditionalWriterImpl) client.createConditionalWriter(tableName, + new ConditionalWriterConfig().setMaxWriteThreads(200))) { + // verify we see the non-default prop values + assertEquals(99, cw1.getConfig().getTimeout(TimeUnit.SECONDS)); + assertEquals(101, cw1.getConfig().getMaxWriteThreads()); + assertEquals(Durability.NONE, cw1.getConfig().getDurability()); + assertEquals(99, cw2.getConfig().getTimeout(TimeUnit.SECONDS)); + assertEquals(200, cw2.getConfig().getMaxWriteThreads()); + assertEquals(Durability.NONE, cw2.getConfig().getDurability()); + } + } + } + }