This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 1d1046c66bb IGNITE-16857 Java thin: Add AtomicLong (#10019) 1d1046c66bb is described below commit 1d1046c66bb2588a9c11c8067ed163d1a1938185 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu May 12 15:42:44 2022 +0300 IGNITE-16857 Java thin: Add AtomicLong (#10019) * Add `IgniteClient.atomicLong()` and `ClientAtomicLong` APIs. * Provides the same functionality as "thick" API, with the only exception of `AtomicConfiguration.affinityFunction`. --- .../ignite/client/ClientAtomicConfiguration.java | 145 +++++++++++++ .../org/apache/ignite/client/ClientAtomicLong.java | 150 +++++++++++++ .../apache/ignite/client/ClientOperationType.java | 40 +++- .../org/apache/ignite/client/IgniteClient.java | 23 ++ .../internal/client/thin/ClientAtomicLongImpl.java | 134 ++++++++++++ .../internal/client/thin/ClientOperation.java | 44 +++- .../internal/client/thin/TcpIgniteClient.java | 41 ++++ .../platform/client/ClientMessageParser.java | 50 +++++ .../ClientAtomicLongCreateRequest.java | 83 +++++++ .../ClientAtomicLongExistsRequest.java | 45 ++++ .../ClientAtomicLongRemoveRequest.java | 48 +++++ .../datastructures/ClientAtomicLongRequest.java | 76 +++++++ .../ClientAtomicLongValueAddAndGetRequest.java | 53 +++++ .../ClientAtomicLongValueCompareAndSetRequest.java | 57 +++++ .../ClientAtomicLongValueGetAndSetRequest.java | 53 +++++ .../ClientAtomicLongValueGetRequest.java | 48 +++++ .../internal/client/thin/AtomicLongTest.java | 238 +++++++++++++++++++++ .../org/apache/ignite/client/ClientTestSuite.java | 2 + .../org/apache/ignite/IgniteClientSpringBean.java | 12 ++ 19 files changed, 1340 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientAtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/client/ClientAtomicConfiguration.java new file mode 100644 index 00000000000..248265fdd72 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/client/ClientAtomicConfiguration.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.util.typedef.internal.S; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Configuration for atomic data structures. + */ +public class ClientAtomicConfiguration { + /** Default number of backups. */ + public static final int DFLT_BACKUPS = 1; + + /** Cache mode. */ + public static final CacheMode DFLT_CACHE_MODE = PARTITIONED; + + /** Default atomic sequence reservation size. */ + public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000; + + /** Atomic sequence reservation size. */ + private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE; + + /** Cache mode. */ + private CacheMode cacheMode = DFLT_CACHE_MODE; + + /** Number of backups. */ + private int backups = DFLT_BACKUPS; + + /** Group name. */ + private String grpName; + + /** + * Gets the number of backup nodes. + * + * @return Number of backup nodes. + */ + public int getBackups() { + return backups; + } + + /** + * Sets the number of backup nodes. + * + * @param backups Number of backup nodes. + * @return {@code this} for chaining. + */ + public ClientAtomicConfiguration setBackups(int backups) { + this.backups = backups; + + return this; + } + + /** + * Gets the cache mode. + * + * @return Cache mode. + */ + public CacheMode getCacheMode() { + return cacheMode; + } + + /** + * Sets the cache mode. + * + * @param cacheMode Cache mode. + * @return {@code this} for chaining. + */ + public ClientAtomicConfiguration setCacheMode(CacheMode cacheMode) { + this.cacheMode = cacheMode; + + return this; + } + + /** + * Gets default number of sequence values reserved for {@link IgniteAtomicSequence} instances. After + * a certain number has been reserved, consequent increments of sequence will happen locally, + * without communication with other nodes, until the next reservation has to be made. + * <p> + * Default value is {@link #DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE}. + * + * @return Atomic sequence reservation size. + */ + public int getAtomicSequenceReserveSize() { + return seqReserveSize; + } + + /** + * Sets default number of sequence values reserved for {@link IgniteAtomicSequence} instances. After a certain + * number has been reserved, consequent increments of sequence will happen locally, without communication with other + * nodes, until the next reservation has to be made. + * + * @param seqReserveSize Atomic sequence reservation size. + * @see #getAtomicSequenceReserveSize() + * @return {@code this} for chaining. + */ + public ClientAtomicConfiguration setAtomicSequenceReserveSize(int seqReserveSize) { + this.seqReserveSize = seqReserveSize; + + return this; + } + + /** + * Sets the cache group name. + * + * @return Cache group name. + */ + public String getGroupName() { + return grpName; + } + + /** + * Gets the cache group name. + * + * @param grpName Cache group name. + * @return {@code this} for chaining. + */ + public ClientAtomicConfiguration setGroupName(String grpName) { + this.grpName = grpName; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientAtomicConfiguration.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/client/ClientAtomicLong.java new file mode 100644 index 00000000000..f9a682afee2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/client/ClientAtomicLong.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import java.io.Closeable; +import org.apache.ignite.IgniteException; + +/** + * Distributed atomic long API. + * <p> + * <h1 class="header">Functionality</h1> + * Distributed atomic long includes the following main functionality: + * <ul> + * <li> + * Method {@link #get()} gets current value of atomic long. + * </li> + * <li> + * Various {@code get..(..)} methods get current value of atomic long + * and increase or decrease value of atomic long. + * </li> + * <li> + * Method {@link #addAndGet(long l)} sums {@code l} with current value of atomic long + * and returns result. + * </li> + * <li> + * Method {@link #incrementAndGet()} increases value of atomic long and returns result. + * </li> + * <li> + * Method {@link #decrementAndGet()} decreases value of atomic long and returns result. + * </li> + * <li> + * Method {@link #getAndSet(long l)} gets current value of atomic long and sets {@code l} + * as value of atomic long. + * </li> + * <li> + * Method {@link #name()} gets name of atomic long. + * </li> + * </ul> + * <p> + * <h1 class="header">Creating Distributed Atomic Long</h1> + * Instance of distributed atomic long can be created by calling the following method: + * <ul> + * <li>{@link IgniteClient#atomicLong(String, long, boolean)}</li> + * </ul> + * @see IgniteClient#atomicLong(String, long, boolean) + */ +public interface ClientAtomicLong extends Closeable { + /** + * Name of atomic long. + * + * @return Name of atomic long. + */ + public String name(); + + /** + * Gets current value of atomic long. + * + * @return Current value of atomic long. + */ + public long get() throws IgniteException; + + /** + * Increments and gets current value of atomic long. + * + * @return Value. + */ + public long incrementAndGet() throws IgniteException; + + /** + * Gets and increments current value of atomic long. + * + * @return Value. + */ + public long getAndIncrement() throws IgniteException; + + /** + * Adds {@code l} and gets current value of atomic long. + * + * @param l Number which will be added. + * @return Value. + */ + public long addAndGet(long l) throws IgniteException; + + /** + * Gets current value of atomic long and adds {@code l}. + * + * @param l Number which will be added. + * @return Value. + */ + public long getAndAdd(long l) throws IgniteException; + + /** + * Decrements and gets current value of atomic long. + * + * @return Value. + */ + public long decrementAndGet() throws IgniteException; + + /** + * Gets and decrements current value of atomic long. + * + * @return Value. + */ + public long getAndDecrement() throws IgniteException; + + /** + * Gets current value of atomic long and sets new value {@code l} of atomic long. + * + * @param l New value of atomic long. + * @return Value. + */ + public long getAndSet(long l) throws IgniteException; + + /** + * Atomically compares current value to the expected value, and if they are equal, sets current value + * to new value. + * + * @param expVal Expected atomic long's value. + * @param newVal New atomic long's value to set if current value equal to expected value. + * @return {@code True} if comparison succeeded, {@code false} otherwise. + */ + public boolean compareAndSet(long expVal, long newVal) throws IgniteException; + + /** + * Gets status of atomic. + * + * @return {@code true} if atomic was removed from cache, {@code false} in other case. + */ + public boolean removed(); + + /** + * Removes this atomic long. + */ + @Override public void close(); +} diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java b/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java index d5ac4b6eaeb..e95768d9640 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java +++ b/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java @@ -213,5 +213,43 @@ public enum ClientOperationType { /** * Get service descriptor ({@link ClientServices#serviceDescriptor(String)}). */ - SERVICE_GET_DESCRIPTOR + SERVICE_GET_DESCRIPTOR, + + /** + * Get or create an AtomicLong ({@link IgniteClient#atomicLong(String, long, boolean)}, + * {@link IgniteClient#atomicLong(String, ClientAtomicConfiguration, long, boolean)}). + */ + ATOMIC_LONG_CREATE, + + /** + * Remove an AtomicLong ({@link ClientAtomicLong#close()}). + */ + ATOMIC_LONG_REMOVE, + + /** + * Check if AtomicLong exists ({@link ClientAtomicLong#removed()}). + */ + ATOMIC_LONG_EXISTS, + + /** + * AtomicLong.get ({@link ClientAtomicLong#get()}). + */ + ATOMIC_LONG_VALUE_GET, + + /** + * AtomicLong.addAndGet (includes {@link ClientAtomicLong#addAndGet(long)}, {@link ClientAtomicLong#incrementAndGet()}, + * {@link ClientAtomicLong#getAndIncrement()}, {@link ClientAtomicLong#getAndAdd(long)}, {@link ClientAtomicLong#decrementAndGet()}, + * {@link ClientAtomicLong#getAndDecrement()}). + */ + ATOMIC_LONG_VALUE_ADD_AND_GET, + + /** + * AtomicLong.getAndSet ({@link ClientAtomicLong#getAndSet(long)}). + */ + ATOMIC_LONG_VALUE_GET_AND_SET, + + /** + * AtomicLong.compareAndSet ({@link ClientAtomicLong#compareAndSet(long, long)}). + */ + ATOMIC_LONG_VALUE_COMPARE_AND_SET } diff --git a/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java index bb94b48e154..40774b4b72a 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/client/IgniteClient.java @@ -217,6 +217,29 @@ public interface IgniteClient extends AutoCloseable { */ public ClientServices services(ClientClusterGroup grp); + /** + * Gets an atomic long from cache and creates one if it has not been created yet and {@code create} flag + * is {@code true}. + * + * @param name Name of atomic long. + * @param initVal Initial value for atomic long. Ignored if {@code create} flag is {@code false}. + * @param create Boolean flag indicating whether data structure should be created if it does not exist. + * @return Atomic long. + */ + public ClientAtomicLong atomicLong(String name, long initVal, boolean create); + + /** + * Gets an atomic long from cache and creates one if it has not been created yet and {@code create} flag + * is {@code true}. + * + * @param name Name of atomic long. + * @param cfg Configuration. + * @param initVal Initial value for atomic long. Ignored if {@code create} flag is {@code false}. + * @param create Boolean flag indicating whether data structure should be created if it does not exist. + * @return Atomic long. + */ + public ClientAtomicLong atomicLong(String name, ClientAtomicConfiguration cfg, long initVal, boolean create); + /** * Closes this client's open connections and relinquishes all underlying resources. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java new file mode 100644 index 00000000000..018eb18ad04 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.client.ClientAtomicLong; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.jetbrains.annotations.Nullable; + +/** + * Client atomic long. + */ +public class ClientAtomicLongImpl implements ClientAtomicLong { + /** */ + private final String name; + + /** */ + private final String groupName; + + /** */ + private final ReliableChannel ch; + + /** + * Constructor. + * + * @param name Atomic long name. + * @param groupName Cache group name. + * @param ch Channel. + */ + public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableChannel ch) { + // name and groupName uniquely identify the data structure. + this.name = name; + this.groupName = groupName; + this.ch = ch; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public long get() throws IgniteException { + return ch.service(ClientOperation.ATOMIC_LONG_VALUE_GET, this::writeName, in -> in.in().readLong()); + } + + /** {@inheritDoc} */ + @Override public long incrementAndGet() throws IgniteException { + return addAndGet(1); + } + + /** {@inheritDoc} */ + @Override public long getAndIncrement() throws IgniteException { + return incrementAndGet() - 1; + } + + /** {@inheritDoc} */ + @Override public long addAndGet(long l) throws IgniteException { + return ch.service(ClientOperation.ATOMIC_LONG_VALUE_ADD_AND_GET, out -> { + writeName(out); + out.out().writeLong(l); + }, in -> in.in().readLong()); + } + + /** {@inheritDoc} */ + @Override public long getAndAdd(long l) throws IgniteException { + return addAndGet(l) - l; + } + + /** {@inheritDoc} */ + @Override public long decrementAndGet() throws IgniteException { + return addAndGet(-1); + } + + /** {@inheritDoc} */ + @Override public long getAndDecrement() throws IgniteException { + return decrementAndGet() + 1; + } + + /** {@inheritDoc} */ + @Override public long getAndSet(long l) throws IgniteException { + return ch.service(ClientOperation.ATOMIC_LONG_VALUE_GET_AND_SET, out -> { + writeName(out); + out.out().writeLong(l); + }, in -> in.in().readLong()); + } + + /** {@inheritDoc} */ + @Override public boolean compareAndSet(long expVal, long newVal) throws IgniteException { + return ch.service(ClientOperation.ATOMIC_LONG_VALUE_COMPARE_AND_SET, out -> { + writeName(out); + out.out().writeLong(expVal); + out.out().writeLong(newVal); + }, in -> in.in().readBoolean()); + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return ch.service(ClientOperation.ATOMIC_LONG_EXISTS, this::writeName, in -> !in.in().readBoolean()); + } + + /** {@inheritDoc} */ + @Override public void close() { + ch.service(ClientOperation.ATOMIC_LONG_REMOVE, this::writeName, null); + } + + /** + * Writes the name. + * + * @param out Output channel. + */ + private void writeName(PayloadOutputChannel out) { + try (BinaryRawWriterEx w = new BinaryWriterExImpl(null, out.out(), null, null)) { + w.writeString(name); + w.writeString(groupName); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java index 8e63d60bd2c..6eb2b222ed6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java @@ -193,7 +193,28 @@ public enum ClientOperation { SERVICE_GET_DESCRIPTORS(7001), /** Get service descriptors. */ - SERVICE_GET_DESCRIPTOR(7002); + SERVICE_GET_DESCRIPTOR(7002), + + /** Get or create an AtomicLong by name. */ + ATOMIC_LONG_CREATE(9000), + + /** Remove an AtomicLong. */ + ATOMIC_LONG_REMOVE(9001), + + /** Check if AtomicLong exists. */ + ATOMIC_LONG_EXISTS(9002), + + /** AtomicLong.get. */ + ATOMIC_LONG_VALUE_GET(9003), + + /** AtomicLong.addAndGet (also covers incrementAndGet, getAndIncrement, getAndAdd, decrementAndGet, getAndDecrement). */ + ATOMIC_LONG_VALUE_ADD_AND_GET(9004), + + /** AtomicLong.getAndSet. */ + ATOMIC_LONG_VALUE_GET_AND_SET(9005), + + /** AtomicLong.compareAndSet. */ + ATOMIC_LONG_VALUE_COMPARE_AND_SET(9006); /** Code. */ private final int code; @@ -351,6 +372,27 @@ public enum ClientOperation { case SERVICE_GET_DESCRIPTOR: return ClientOperationType.SERVICE_GET_DESCRIPTOR; + case ATOMIC_LONG_CREATE: + return ClientOperationType.ATOMIC_LONG_CREATE; + + case ATOMIC_LONG_REMOVE: + return ClientOperationType.ATOMIC_LONG_REMOVE; + + case ATOMIC_LONG_EXISTS: + return ClientOperationType.ATOMIC_LONG_EXISTS; + + case ATOMIC_LONG_VALUE_GET: + return ClientOperationType.ATOMIC_LONG_VALUE_GET; + + case ATOMIC_LONG_VALUE_ADD_AND_GET: + return ClientOperationType.ATOMIC_LONG_VALUE_ADD_AND_GET; + + case ATOMIC_LONG_VALUE_GET_AND_SET: + return ClientOperationType.ATOMIC_LONG_VALUE_GET_AND_SET; + + case ATOMIC_LONG_VALUE_COMPARE_AND_SET: + return ClientOperationType.ATOMIC_LONG_VALUE_COMPARE_AND_SET; + default: return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java index 236f6ba1eba..824242dd47a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java @@ -33,6 +33,8 @@ import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.client.ClientAtomicConfiguration; +import org.apache.ignite.client.ClientAtomicLong; import org.apache.ignite.client.ClientCache; import org.apache.ignite.client.ClientCacheConfiguration; import org.apache.ignite.client.ClientCluster; @@ -57,6 +59,7 @@ import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; +import org.apache.ignite.internal.util.GridArgumentCheck; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.MarshallerContext; @@ -329,6 +332,44 @@ public class TcpIgniteClient implements IgniteClient { return services.withClusterGroup((ClientClusterGroupImpl)grp); } + /** {@inheritDoc} */ + @Override public ClientAtomicLong atomicLong(String name, long initVal, boolean create) { + return atomicLong(name, null, initVal, create); + } + + /** {@inheritDoc} */ + @Override public ClientAtomicLong atomicLong(String name, ClientAtomicConfiguration cfg, long initVal, boolean create) { + GridArgumentCheck.notNull(name, "name"); + + if (create) { + ch.service(ClientOperation.ATOMIC_LONG_CREATE, out -> { + try (BinaryRawWriterEx w = new BinaryWriterExImpl(null, out.out(), null, null)) { + w.writeString(name); + w.writeLong(initVal); + + if (cfg != null) { + w.writeBoolean(true); + w.writeInt(cfg.getAtomicSequenceReserveSize()); + w.writeByte((byte)cfg.getCacheMode().ordinal()); + w.writeInt(cfg.getBackups()); + w.writeString(cfg.getGroupName()); + } + else + w.writeBoolean(false); + } + + }, null); + } + + ClientAtomicLong res = new ClientAtomicLongImpl(name, cfg != null ? cfg.getGroupName() : null, ch); + + // Return null when specified atomic long does not exist to match IgniteKernal behavior. + if (!create && res.removed()) + return null; + + return res; + } + /** * Initializes new instance of {@link IgniteClient}. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index ea71b3dc304..876ec9632ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -78,6 +78,13 @@ import org.apache.ignite.internal.processors.platform.client.cluster.ClientClust import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterWalChangeStateRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterWalGetStateRequest; import org.apache.ignite.internal.processors.platform.client.compute.ClientExecuteTaskRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongCreateRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongExistsRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongRemoveRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueAddAndGetRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueCompareAndSetRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetAndSetRequest; +import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetRequest; import org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorRequest; import org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorsRequest; import org.apache.ignite.internal.processors.platform.client.service.ClientServiceInvokeRequest; @@ -289,6 +296,28 @@ public class ClientMessageParser implements ClientListenerMessageParser { /** */ private static final short OP_DATA_STREAMER_ADD_DATA = 8001; + /** Data structures. */ + /** Create an AtomicLong. */ + private static final short OP_ATOMIC_LONG_CREATE = 9000; + + /** Remove an AtomicLong. */ + private static final short OP_ATOMIC_LONG_REMOVE = 9001; + + /** Check if AtomicLong exists. */ + private static final short OP_ATOMIC_LONG_EXISTS = 9002; + + /** AtomicLong.get. */ + private static final short OP_ATOMIC_LONG_VALUE_GET = 9003; + + /** AtomicLong.addAndGet (also covers incrementAndGet, getAndIncrement, getAndAdd, decrementAndGet, getAndDecrement). */ + private static final short OP_ATOMIC_LONG_VALUE_ADD_AND_GET = 9004; + + /** AtomicLong.getAndSet. */ + private static final short OP_ATOMIC_LONG_VALUE_GET_AND_SET = 9005; + + /** AtomicLong.compareAndSet. */ + private static final short OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET = 9006; + /** Marshaller. */ private final GridBinaryMarshaller marsh; @@ -516,6 +545,27 @@ public class ClientMessageParser implements ClientListenerMessageParser { case OP_DATA_STREAMER_ADD_DATA: return new ClientDataStreamerAddDataRequest(reader); + + case OP_ATOMIC_LONG_CREATE: + return new ClientAtomicLongCreateRequest(reader); + + case OP_ATOMIC_LONG_REMOVE: + return new ClientAtomicLongRemoveRequest(reader); + + case OP_ATOMIC_LONG_EXISTS: + return new ClientAtomicLongExistsRequest(reader); + + case OP_ATOMIC_LONG_VALUE_GET: + return new ClientAtomicLongValueGetRequest(reader); + + case OP_ATOMIC_LONG_VALUE_ADD_AND_GET: + return new ClientAtomicLongValueAddAndGetRequest(reader); + + case OP_ATOMIC_LONG_VALUE_GET_AND_SET: + return new ClientAtomicLongValueGetAndSetRequest(reader); + + case OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET: + return new ClientAtomicLongValueCompareAndSetRequest(reader); } return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongCreateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongCreateRequest.java new file mode 100644 index 00000000000..70ef46d2f58 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongCreateRequest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientRequest; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; +import org.jetbrains.annotations.Nullable; + +/** + * Gets or creates atomic long by name. + */ +public class ClientAtomicLongCreateRequest extends ClientRequest { + /** Atomic long name. */ + private final String name; + + /** Initial value. */ + private final long initVal; + + /** Configuration. */ + private final AtomicConfiguration atomicConfiguration; + + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongCreateRequest(BinaryRawReader reader) { + super(reader); + + name = reader.readString(); + initVal = reader.readLong(); + atomicConfiguration = readAtomicConfiguration(reader); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + try { + ctx.kernalContext().dataStructures().atomicLong(name, atomicConfiguration, initVal, true); + + return new ClientResponse(requestId()); + } + catch (IgniteCheckedException e) { + return new ClientResponse(requestId(), e.getMessage()); + } + } + + /** + * Reads the atomic configuration. + * + * @param reader Reader. + * @return Config. + */ + @Nullable private static AtomicConfiguration readAtomicConfiguration(BinaryRawReader reader) { + if (!reader.readBoolean()) + return null; + + return new AtomicConfiguration() + .setAtomicSequenceReserveSize(reader.readInt()) + .setCacheMode(CacheMode.fromOrdinal(reader.readByte())) + .setBackups(reader.readInt()) + .setGroupName(reader.readString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongExistsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongExistsRequest.java new file mode 100644 index 00000000000..e25d2503bb5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongExistsRequest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long exists request. + */ +public class ClientAtomicLongExistsRequest extends ClientAtomicLongRequest { + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongExistsRequest(BinaryRawReader reader) { + super(reader); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + IgniteAtomicLong atomicLong = atomicLong(ctx); + + return new ClientBooleanResponse(requestId(), atomicLong != null); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRemoveRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRemoveRequest.java new file mode 100644 index 00000000000..38ba4ff888b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRemoveRequest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long remove request. + */ +public class ClientAtomicLongRemoveRequest extends ClientAtomicLongRequest { + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongRemoveRequest(BinaryRawReader reader) { + super(reader); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + IgniteAtomicLong atomicLong = atomicLong(ctx); + + // Same semantics as IgniteAtomicLong - do nothing when does not exist. + if (atomicLong != null) + atomicLong.close(); + + return new ClientResponse(requestId()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java new file mode 100644 index 00000000000..706329337b2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientRequest; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long value request. + */ +public class ClientAtomicLongRequest extends ClientRequest { + /** Atomic long name. */ + private final String name; + + /** Cache group name. */ + private final String groupName; + + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongRequest(BinaryRawReader reader) { + super(reader); + + name = reader.readString(); + groupName = reader.readString(); + } + + /** + * Gets the atomic long. + * + * @param ctx Context. + * @return Atomic long or null. + */ + protected IgniteAtomicLong atomicLong(ClientConnectionContext ctx) { + AtomicConfiguration cfg = groupName == null ? null : new AtomicConfiguration().setGroupName(groupName); + + try { + return ctx.kernalContext().dataStructures().atomicLong(name, cfg, 0, false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e.getMessage(), e); + } + } + + /** + * Gets a response for non-existent atomic long. + * + * @return Response for non-existent atomic long. + */ + protected ClientResponse notFoundResponse() { + return new ClientResponse(requestId(), String.format("AtomicLong with name '%s' does not exist.", name)); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueAddAndGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueAddAndGetRequest.java new file mode 100644 index 00000000000..3fa0513faaf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueAddAndGetRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientLongResponse; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long add and get request. + */ +public class ClientAtomicLongValueAddAndGetRequest extends ClientAtomicLongRequest { + /** Operand. */ + private final long operand; + + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongValueAddAndGetRequest(BinaryRawReader reader) { + super(reader); + + operand = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + IgniteAtomicLong atomicLong = atomicLong(ctx); + + if (atomicLong == null) + return notFoundResponse(); + + return new ClientLongResponse(requestId(), atomicLong.addAndGet(operand)); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java new file mode 100644 index 00000000000..ee550c6a8bd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long get and set request. + */ +public class ClientAtomicLongValueCompareAndSetRequest extends ClientAtomicLongRequest { + /** */ + private final long expected; + + /** */ + private final long val; + + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongValueCompareAndSetRequest(BinaryRawReader reader) { + super(reader); + + expected = reader.readLong(); + val = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + IgniteAtomicLong atomicLong = atomicLong(ctx); + + if (atomicLong == null) + return notFoundResponse(); + + return new ClientBooleanResponse(requestId(), atomicLong.compareAndSet(expected, val)); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueGetAndSetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueGetAndSetRequest.java new file mode 100644 index 00000000000..a1ff364cb35 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueGetAndSetRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientLongResponse; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long get and set request. + */ +public class ClientAtomicLongValueGetAndSetRequest extends ClientAtomicLongRequest { + /** Operand. */ + private final long operand; + + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongValueGetAndSetRequest(BinaryRawReader reader) { + super(reader); + + operand = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + IgniteAtomicLong atomicLong = atomicLong(ctx); + + if (atomicLong == null) + return notFoundResponse(); + + return new ClientLongResponse(requestId(), atomicLong.getAndSet(operand)); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueGetRequest.java new file mode 100644 index 00000000000..d45ad21e89b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueGetRequest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.datastructures; + +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientLongResponse; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Atomic long value request. + */ +public class ClientAtomicLongValueGetRequest extends ClientAtomicLongRequest { + /** + * Constructor. + * + * @param reader Reader. + */ + public ClientAtomicLongValueGetRequest(BinaryRawReader reader) { + super(reader); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + IgniteAtomicLong atomicLong = atomicLong(ctx); + + if (atomicLong == null) + return notFoundResponse(); + + return new ClientLongResponse(requestId(), atomicLong.get()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AtomicLongTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AtomicLongTest.java new file mode 100644 index 00000000000..f7a4ab999dc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AtomicLongTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.client.ClientAtomicConfiguration; +import org.apache.ignite.client.ClientAtomicLong; +import org.apache.ignite.client.ClientException; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.junit.Test; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; + +/** + * Tests client atomic long. + */ +public class AtomicLongTest extends AbstractThinClientTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * Tests initial value setting. + */ + @Test + public void testCreateSetsInitialValue() { + String name = "testCreateSetsInitialValue"; + + try (IgniteClient client = startClient(0)) { + ClientAtomicLong atomicLong = client.atomicLong(name, 42, true); + + ClientAtomicLong atomicLongWithGroup = client.atomicLong( + name, new ClientAtomicConfiguration().setGroupName("grp"), 43, true); + + assertEquals(42, atomicLong.get()); + assertEquals(43, atomicLongWithGroup.get()); + } + } + + /** + * Tests that initial value is ignored when atomic long already exists. + */ + @Test + public void testCreateIgnoresInitialValueWhenAlreadyExists() { + String name = "testCreateIgnoresInitialValueWhenAlreadyExists"; + + try (IgniteClient client = startClient(0)) { + ClientAtomicLong atomicLong = client.atomicLong(name, 42, true); + ClientAtomicLong atomicLong2 = client.atomicLong(name, -42, true); + + assertEquals(42, atomicLong.get()); + assertEquals(42, atomicLong2.get()); + } + } + + /** + * Tests that exception is thrown when atomic long does not exist. + */ + @Test + public void testOperationsThrowExceptionWhenAtomicLongDoesNotExist() { + try (IgniteClient client = startClient(0)) { + String name = "testOperationsThrowExceptionWhenAtomicLongDoesNotExist"; + ClientAtomicLong atomicLong = client.atomicLong(name, 0, true); + atomicLong.close(); + + assertDoesNotExistError(name, atomicLong::get); + + assertDoesNotExistError(name, atomicLong::incrementAndGet); + assertDoesNotExistError(name, atomicLong::getAndIncrement); + assertDoesNotExistError(name, atomicLong::decrementAndGet); + assertDoesNotExistError(name, atomicLong::getAndDecrement); + + assertDoesNotExistError(name, () -> atomicLong.addAndGet(1)); + assertDoesNotExistError(name, () -> atomicLong.getAndAdd(1)); + + assertDoesNotExistError(name, () -> atomicLong.getAndSet(1)); + assertDoesNotExistError(name, () -> atomicLong.compareAndSet(1, 2)); + } + } + + /** + * Tests removed property. + */ + @Test + public void testRemoved() { + String name = "testRemoved"; + + try (IgniteClient client = startClient(0)) { + ClientAtomicLong atomicLong = client.atomicLong(name, 0, false); + assertNull(atomicLong); + + atomicLong = client.atomicLong(name, 1, true); + assertFalse(atomicLong.removed()); + assertEquals(1, atomicLong.get()); + + atomicLong.close(); + assertTrue(atomicLong.removed()); + } + } + + /** + * Tests increment, decrement, add. + */ + @Test + public void testIncrementDecrementAdd() { + String name = "testIncrementDecrementAdd"; + + try (IgniteClient client = startClient(0)) { + ClientAtomicLong atomicLong = client.atomicLong(name, 1, true); + + assertEquals(2, atomicLong.incrementAndGet()); + assertEquals(2, atomicLong.getAndIncrement()); + + assertEquals(3, atomicLong.get()); + + assertEquals(2, atomicLong.decrementAndGet()); + assertEquals(2, atomicLong.getAndDecrement()); + + assertEquals(1, atomicLong.get()); + + assertEquals(101, atomicLong.addAndGet(100)); + assertEquals(101, atomicLong.getAndAdd(-50)); + + assertEquals(51, atomicLong.get()); + } + } + + /** + * Tests getAndSet. + */ + @Test + public void testGetAndSet() { + String name = "testGetAndSet"; + + try (IgniteClient client = startClient(0)) { + ClientAtomicLong atomicLong = client.atomicLong(name, 1, true); + + assertEquals(1, atomicLong.getAndSet(100)); + assertEquals(100, atomicLong.get()); + } + } + + /** + * Tests compareAndSet. + */ + @Test + public void testCompareAndSet() { + String name = "testCompareAndSet"; + + try (IgniteClient client = startClient(0)) { + ClientAtomicLong atomicLong = client.atomicLong(name, 1, true); + + assertFalse(atomicLong.compareAndSet(2, 3)); + assertEquals(1, atomicLong.get()); + + assertTrue(atomicLong.compareAndSet(1, 4)); + assertEquals(4, atomicLong.get()); + } + } + + /** + * Tests atomic long with custom configuration. + */ + @Test + public void testCustomConfigurationPropagatesToServer() { + ClientAtomicConfiguration cfg1 = new ClientAtomicConfiguration() + .setAtomicSequenceReserveSize(64) + .setBackups(2) + .setCacheMode(CacheMode.PARTITIONED) + .setGroupName("atomic-long-group-partitioned"); + + ClientAtomicConfiguration cfg2 = new ClientAtomicConfiguration() + .setAtomicSequenceReserveSize(32) + .setBackups(3) + .setCacheMode(CacheMode.REPLICATED) + .setGroupName("atomic-long-group-replicated"); + + String name = "testCustomConfiguration"; + + try (IgniteClient client = startClient(0)) { + client.atomicLong(name, cfg1, 1, true); + client.atomicLong(name, cfg2, 2, true); + } + + List<IgniteInternalCache<?, ?>> caches = new ArrayList<>(grid(0).cachesx()); + assertEquals(3, caches.size()); + + IgniteInternalCache<?, ?> partitionedCache = caches.get(1); + IgniteInternalCache<?, ?> replicatedCache = caches.get(2); + + assertEquals("ignite-sys-atomic-cache@atomic-long-group-partitioned", partitionedCache.name()); + assertEquals("ignite-sys-atomic-cache@atomic-long-group-replicated", replicatedCache.name()); + + assertEquals(2, partitionedCache.configuration().getBackups()); + assertEquals(Integer.MAX_VALUE, replicatedCache.configuration().getBackups()); + } + + /** + * Asserts that "does not exist" error is thrown. + * + * @param name Atomic long name. + * @param callable Callable. + */ + private void assertDoesNotExistError(String name, Callable<Object> callable) { + ClientException ex = (ClientException)assertThrows(null, callable, ClientException.class, null); + + assertContains(null, ex.getMessage(), "AtomicLong with name '" + name + "' does not exist."); + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java index b9b58cb5314..d21bff31efc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java @@ -17,6 +17,7 @@ package org.apache.ignite.client; +import org.apache.ignite.internal.client.thin.AtomicLongTest; import org.apache.ignite.internal.client.thin.CacheAsyncTest; import org.apache.ignite.internal.client.thin.CacheEntryListenersTest; import org.apache.ignite.internal.client.thin.ClusterApiTest; @@ -69,6 +70,7 @@ import org.junit.runners.Suite; CacheAsyncTest.class, TimeoutTest.class, OptimizedMarshallerClassesCachedTest.class, + AtomicLongTest.class }) public class ClientTestSuite { // No-op. diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteClientSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteClientSpringBean.java index 74f29244969..01015576394 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteClientSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteClientSpringBean.java @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.List; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.client.ClientAtomicConfiguration; +import org.apache.ignite.client.ClientAtomicLong; import org.apache.ignite.client.ClientCache; import org.apache.ignite.client.ClientCacheConfiguration; import org.apache.ignite.client.ClientCluster; @@ -228,6 +230,16 @@ public class IgniteClientSpringBean implements IgniteClient, SmartLifecycle { return cli.services(grp); } + /** {@inheritDoc} */ + @Override public ClientAtomicLong atomicLong(String name, long initVal, boolean create) { + return cli.atomicLong(name, initVal, create); + } + + /** {@inheritDoc} */ + @Override public ClientAtomicLong atomicLong(String name, ClientAtomicConfiguration cfg, long initVal, boolean create) { + return cli.atomicLong(name, cfg, initVal, create); + } + /** {@inheritDoc} */ @Override public void close() { cli.close();