http://git-wip-us.apache.org/repos/asf/ignite/blob/c56d16fb/modules/platforms/nodejs/index.js ---------------------------------------------------------------------- diff --git a/modules/platforms/nodejs/index.js b/modules/platforms/nodejs/index.js new file mode 100644 index 0000000..cce86ab --- /dev/null +++ b/modules/platforms/nodejs/index.js @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +module.exports = require('./lib/IgniteClient'); +module.exports.ObjectType = require('./lib/ObjectType').ObjectType; +module.exports.MapObjectType = require('./lib/ObjectType').MapObjectType; +module.exports.CollectionObjectType = require('./lib/ObjectType').CollectionObjectType; +module.exports.ComplexObjectType = require('./lib/ObjectType').ComplexObjectType; +module.exports.ObjectArrayType = require('./lib/ObjectType').ObjectArrayType; +module.exports.BinaryObject = require('./lib/BinaryObject'); +module.exports.Timestamp = require('./lib/Timestamp'); +module.exports.EnumItem = require('./lib/EnumItem'); +module.exports.Decimal = require('decimal.js'); +module.exports.Errors = require('./lib/Errors'); +module.exports.IgniteClientConfiguration = require('./lib/IgniteClientConfiguration'); +module.exports.CacheClient = require('./lib/CacheClient'); +module.exports.CacheEntry = require('./lib/CacheClient').CacheEntry; +module.exports.CacheConfiguration = require('./lib/CacheConfiguration'); +module.exports.QueryEntity = require('./lib/CacheConfiguration').QueryEntity; +module.exports.QueryField = require('./lib/CacheConfiguration').QueryField; +module.exports.QueryIndex = require('./lib/CacheConfiguration').QueryIndex; +module.exports.CacheKeyConfiguration = require('./lib/CacheConfiguration').CacheKeyConfiguration; +module.exports.SqlQuery = require('./lib/Query').SqlQuery; +module.exports.SqlFieldsQuery = require('./lib/Query').SqlFieldsQuery; +module.exports.ScanQuery = require('./lib/Query').ScanQuery; +module.exports.Cursor = require('./lib/Cursor').Cursor; +module.exports.SqlFieldsCursor = require('./lib/Cursor').SqlFieldsCursor;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c56d16fb/modules/platforms/nodejs/lib/BinaryObject.js ---------------------------------------------------------------------- diff --git a/modules/platforms/nodejs/lib/BinaryObject.js b/modules/platforms/nodejs/lib/BinaryObject.js new file mode 100644 index 0000000..2cc6be6 --- /dev/null +++ b/modules/platforms/nodejs/lib/BinaryObject.js @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const Util = require('util'); +const ObjectType = require('./ObjectType').ObjectType; +const ComplexObjectType = require('./ObjectType').ComplexObjectType; +const Errors = require('./Errors'); +const BinaryUtils = require('./internal/BinaryUtils'); +const BinaryType = require('./internal/BinaryType'); +const BinaryField = require('./internal/BinaryType').BinaryField; +const BinaryTypeBuilder = require('./internal/BinaryType').BinaryTypeBuilder; +const BinaryWriter = require('./internal/BinaryWriter'); +const ArgumentChecker = require('./internal/ArgumentChecker'); +const Logger = require('./internal/Logger'); + +const HEADER_LENGTH = 24; +const VERSION = 1; + +// user type +const FLAG_USER_TYPE = 0x0001; +// schema exists +const FLAG_HAS_SCHEMA = 0x0002; +// object contains raw data +const FLAG_HAS_RAW_DATA = 0x0004; +// offsets take 1 byte +const FLAG_OFFSET_ONE_BYTE = 0x0008; +// offsets take 2 bytes +const FLAG_OFFSET_TWO_BYTES = 0x0010; +// compact footer, no field IDs +const FLAG_COMPACT_FOOTER = 0x0020; + +/** + * Class representing a complex Ignite object in the binary form. + * + * It corresponds to COMPOSITE_TYPE.COMPLEX_OBJECT {@link ObjectType.COMPOSITE_TYPE}, + * has mandatory type Id, which corresponds to a name of the complex type, + * and includes optional fields. + * + * An instance of the BinaryObject can be obtained/created by the following ways: + * - returned by the client when a complex object is received from Ignite cache + * and is not deserialized to another JavaScript object. + * - created using the public constructor. Fields may be added to such an instance using setField() method. + * - created from a JavaScript object using static fromObject() method. + */ +class BinaryObject { + + /** + * Creates an instance of the BinaryObject without any fields. + * + * Fields may be added later using setField() method. + * + * @param {string} typeName - name of the complex type to generate the type Id. + * + * @return {BinaryObject} - new BinaryObject instance. + * + * @throws {IgniteClientError} if error. + */ + constructor(typeName) { + ArgumentChecker.notEmpty(typeName, 'typeName'); + this._buffer = null; + this._fields = new Map(); + this._typeBuilder = BinaryTypeBuilder.fromTypeName(typeName); + this._modified = false; + this._schemaOffset = null; + this._compactFooter = false; + } + + /** + * Creates an instance of the BinaryObject from the specified instance of JavaScript Object. + * + * All fields of the JavaScript Object instance with their values are added to the BinaryObject. + * Fields may be added or removed later using setField() and removeField() methods. + * + * If complexObjectType parameter is specified, then the type Id is taken from it. + * Otherwise, the type Id is generated from the name of the JavaScript Object. + * + * @async + * + * @param {object} jsObject - instance of JavaScript Object + * which adds and initializes the fields of the BinaryObject instance. + * @param {ComplexObjectType} [complexObjectType] - instance of complex type definition + * which specifies non-standard mapping of the fields of the BinaryObject instance + * to/from the Ignite types. + * + * @return {BinaryObject} - new BinaryObject instance. + * + * @throws {IgniteClientError} if error. + */ + static async fromObject(jsObject, complexObjectType = null) { + ArgumentChecker.notEmpty(jsObject, 'jsObject'); + ArgumentChecker.hasType(complexObjectType, 'complexObjectType', false, ComplexObjectType); + const typeBuilder = BinaryTypeBuilder.fromObject(jsObject, complexObjectType); + const result = new BinaryObject(typeBuilder.getTypeName()); + result._typeBuilder = typeBuilder; + let fieldName; + for (let field of result._typeBuilder.getFields()) { + fieldName = field.name; + if (jsObject && jsObject[fieldName] !== undefined) { + result.setField( + fieldName, + jsObject[fieldName], + complexObjectType ? complexObjectType._getFieldType(fieldName) : null); + } + else { + throw Errors.IgniteClientError.serializationError( + true, Util.format('field "%s" is undefined', fieldName)); + } + } + return result; + } + + /** + * Sets the new value of the specified field. + * Adds the specified field, if it did not exist before. + * + * Optionally, specifies a type of the field. + * If the type is not specified then during operations the Ignite client + * will try to make automatic mapping between JavaScript types and Ignite object types - + * according to the mapping table defined in the description of the {@link ObjectType} class. + * + * @param {string} fieldName - name of the field. + * @param {*} fieldValue - new value of the field. + * @param {ObjectType.PRIMITIVE_TYPE | CompositeType} [fieldType] - type of the field: + * - either a type code of primitive (simple) type + * - or an instance of class representing non-primitive (composite) type + * - or null (or not specified) that means the type is not specified. + * + * @return {BinaryObject} - the same instance of BinaryObject + * + * @throws {IgniteClientError} if error. + */ + setField(fieldName, fieldValue, fieldType = null) { + ArgumentChecker.notEmpty(fieldName, 'fieldName'); + this._modified = true; + const field = new BinaryObjectField(fieldName, fieldValue, fieldType); + this._fields.set(field.id, field); + this._typeBuilder.setField(fieldName, field.typeCode); + return this; + } + + /** + * Removes the specified field. + * Does nothing if the field does not exist. + * + * @param {string} fieldName - name of the field. + * + * @return {BinaryObject} - the same instance of BinaryObject + * + * @throws {IgniteClientError} if error. + */ + removeField(fieldName) { + ArgumentChecker.notEmpty(fieldName, 'fieldName'); + this._modified = true; + this._fields.delete(BinaryField._calculateId(fieldName)); + this._typeBuilder.removeField(fieldName); + return this; + } + + /** + * Checks if the specified field exists in this BinaryObject instance. + * + * @param {string} fieldName - name of the field. + * + * @return {boolean} - true if exists, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + hasField(fieldName) { + ArgumentChecker.notEmpty(fieldName, 'fieldName'); + return this._fields.has(BinaryField._calculateId(fieldName)); + } + + /** + * Returns a value of the specified field. + * + * Optionally, specifies a type of the field. + * If the type is not specified then the Ignite client + * will try to make automatic mapping between JavaScript types and Ignite object types - + * according to the mapping table defined in the description of the {@link ObjectType} class. + * + * @async + * + * @param {string} fieldName - name of the field. + * @param {ObjectType.PRIMITIVE_TYPE | CompositeType} [fieldType] - type of the field: + * - either a type code of primitive (simple) type + * - or an instance of class representing non-primitive (composite) type + * - or null (or not specified) that means the type is not specified. + * + * @return {*} - value of the field or JavaScript undefined if the field does not exist. + * + * @throws {IgniteClientError} if error. + */ + async getField(fieldName, fieldType = null) { + ArgumentChecker.notEmpty(fieldName, 'fieldName'); + const field = this._fields.get(BinaryField._calculateId(fieldName)); + return field ? await field.getValue(fieldType) : undefined; + } + + /** + * Deserializes this BinaryObject instance into an instance of the specified complex object type. + * + * @async + * + * @param {ComplexObjectType} complexObjectType - instance of class representing complex object type. + * + * @return {object} - instance of the JavaScript object + * which corresponds to the specified complex object type. + * + * @throws {IgniteClientError} if error. + */ + async toObject(complexObjectType) { + ArgumentChecker.notNull(complexObjectType, 'complexObjectType'); + ArgumentChecker.hasType(complexObjectType, 'complexObjectType', false, ComplexObjectType); + const result = new (complexObjectType._objectConstructor); + let binaryField; + let fieldName; + for (let field of this._fields.values()) { + binaryField = this._typeBuilder.getField(field.id); + if (!binaryField) { + throw Errors.IgniteClientError.serializationError( + false, Util.format('field with id "%s" can not be deserialized', field.id)); + } + fieldName = binaryField.name; + result[fieldName] = await field.getValue(complexObjectType._getFieldType(fieldName)); + } + return result; + } + + /** + * Returns type name of this BinaryObject instance. + * + * @return {string} - type name. + */ + getTypeName() { + return this._typeBuilder.getTypeName(); + } + + /** + * Returns names of all fields of this BinaryObject instance. + * + * @return {Array<string>} - names of all fields. + * + * @throws {IgniteClientError} if error. + */ + getFieldNames() { + return this._typeBuilder._schema.fieldIds.map(fieldId => { + const field = this._typeBuilder.getField(fieldId); + if (field) { + return field.name; + } + else { + throw Errors.IgniteClientError.internalError( + Util.format('Field "%s" is absent in binary type fields', fieldId)); + } + }); + } + + /** Private methods */ + + /** + * @ignore + */ + static _isFlagSet(flags, flag) { + return (flags & flag) === flag; + } + + /** + * @ignore + */ + static async _fromBuffer(buffer) { + const result = new BinaryObject(new ComplexObjectType({})._typeName); + result._buffer = buffer; + result._startPos = buffer.position; + await result._read(); + return result; + } + + /** + * @ignore + */ + async _write(buffer) { + if (this._buffer && !this._modified) { + buffer.writeBuffer(this._buffer.buffer, this._startPos, this._startPos + this._length); + } + else { + await this._typeBuilder.finalize(); + this._startPos = buffer.position; + buffer.position = this._startPos + HEADER_LENGTH; + // write fields + for (let field of this._fields.values()) { + await field._writeValue(buffer, this._typeBuilder.getField(field.id).typeCode); + } + this._schemaOffset = buffer.position - this._startPos; + // write schema + for (let field of this._fields.values()) { + field._writeOffset(buffer, this._startPos); + } + this._length = buffer.position - this._startPos; + this._buffer = buffer; + // write header + this._writeHeader(); + this._buffer.position = this._startPos + this._length; + this._modified = false; + } + + if (Logger.debug) { + Logger.logDebug('BinaryObject._write [' + [...this._buffer.getSlice(this._startPos, this._startPos + this._length)] + ']'); + } + } + + /** + * @ignore + */ + _writeHeader() { + this._buffer.position = this._startPos; + // type code + this._buffer.writeByte(BinaryUtils.TYPE_CODE.COMPLEX_OBJECT); + // version + this._buffer.writeByte(VERSION); + // flags + this._buffer.writeShort(FLAG_USER_TYPE | FLAG_HAS_SCHEMA | FLAG_COMPACT_FOOTER); + // type id + this._buffer.writeInteger(this._typeBuilder.getTypeId()); + // hash code + this._buffer.writeInteger(BinaryUtils.contentHashCode( + this._buffer, this._startPos + HEADER_LENGTH, this._schemaOffset - 1)); + // length + this._buffer.writeInteger(this._length); + // schema id + this._buffer.writeInteger(this._typeBuilder.getSchemaId()); + // schema offset + this._buffer.writeInteger(this._schemaOffset); + } + + /** + * @ignore + */ + async _read() { + await this._readHeader(); + this._buffer.position = this._startPos + this._schemaOffset; + const fieldOffsets = new Array(); + const fieldIds = this._typeBuilder._schema.fieldIds; + let index = 0; + let fieldId; + while (this._buffer.position < this._startPos + this._length) { + if (!this._compactFooter) { + fieldId = this._buffer.readInteger(); + this._typeBuilder._schema.addField(fieldId); + } + else { + if (index >= fieldIds.length) { + throw Errors.IgniteClientError.serializationError( + false, 'wrong number of fields in schema'); + } + fieldId = fieldIds[index]; + index++; + } + fieldOffsets.push([fieldId, this._buffer.readNumber(this._offsetType)]); + } + fieldOffsets.sort((val1, val2) => val1[1] - val2[1]); + let offset; + let nextOffset; + let field; + for (let i = 0; i < fieldOffsets.length; i++) { + fieldId = fieldOffsets[i][0]; + offset = fieldOffsets[i][1]; + nextOffset = i + 1 < fieldOffsets.length ? fieldOffsets[i + 1][1] : this._schemaOffset; + field = BinaryObjectField._fromBuffer( + this._buffer, this._startPos + offset, nextOffset - offset, fieldId); + this._fields.set(field.id, field); + } + this._buffer.position = this._startPos + this._length; + } + + /** + * @ignore + */ + async _readHeader() { + // type code + this._buffer.readByte(); + // version + const version = this._buffer.readByte(); + if (version !== VERSION) { + throw Errors.IgniteClientError.internalError(); + } + // flags + const flags = this._buffer.readShort(); + // type id + const typeId = this._buffer.readInteger(); + // hash code + this._buffer.readInteger(); + // length + this._length = this._buffer.readInteger(); + // schema id + const schemaId = this._buffer.readInteger(); + // schema offset + this._schemaOffset = this._buffer.readInteger(); + const hasSchema = BinaryObject._isFlagSet(flags, FLAG_HAS_SCHEMA); + this._compactFooter = BinaryObject._isFlagSet(flags, FLAG_COMPACT_FOOTER); + this._offsetType = BinaryObject._isFlagSet(flags, FLAG_OFFSET_ONE_BYTE) ? + BinaryUtils.TYPE_CODE.BYTE : + BinaryObject._isFlagSet(flags, FLAG_OFFSET_TWO_BYTES) ? + BinaryUtils.TYPE_CODE.SHORT : + BinaryUtils.TYPE_CODE.INTEGER; + + if (BinaryObject._isFlagSet(FLAG_HAS_RAW_DATA)) { + throw Errors.IgniteClientError.serializationError( + false, 'complex objects with raw data are not supported'); + } + if (this._compactFooter && !hasSchema) { + throw Errors.IgniteClientError.serializationError( + false, 'schema is absent for object with compact footer'); + } + this._typeBuilder = await BinaryTypeBuilder.fromTypeId(typeId, schemaId, hasSchema); + } +} + +/** + * @ignore + */ +class BinaryObjectField { + constructor(name, value = undefined, type = null) { + this._name = name; + this._id = BinaryField._calculateId(name); + this._value = value; + this._type = type; + if (!type && value !== undefined && value !== null) { + this._type = BinaryUtils.calcObjectType(value); + } + this._typeCode = null; + if (this._type) { + this._typeCode = BinaryUtils.getTypeCode(this._type); + } + } + + get id() { + return this._id; + } + + get typeCode() { + return this._typeCode; + } + + async getValue(type = null) { + if (this._value === undefined || this._buffer && this._type !== type) { + this._buffer.position = this._offset; + const BinaryReader = require('./internal/BinaryReader'); + this._value = await BinaryReader.readObject(this._buffer, type); + this._type = type; + } + return this._value; + } + + static _fromBuffer(buffer, offset, length, id) { + const result = new BinaryObjectField(null); + result._id = id; + result._buffer = buffer; + result._offset = offset; + result._length = length; + return result; + } + + async _writeValue(buffer, expectedTypeCode) { + const offset = buffer.position; + if (this._buffer) { + buffer.writeBuffer(this._buffer.buffer, this._offset, this._offset + this._length); + } + else { + BinaryUtils.checkCompatibility(this._value, expectedTypeCode); + await BinaryWriter.writeObject(buffer, this._value, this._type); + } + this._buffer = buffer; + this._length = buffer.position - offset; + this._offset = offset; + } + + _writeOffset(buffer, headerStartPos) { + buffer.writeInteger(this._offset - headerStartPos); + } +} + +module.exports = BinaryObject; http://git-wip-us.apache.org/repos/asf/ignite/blob/c56d16fb/modules/platforms/nodejs/lib/CacheClient.js ---------------------------------------------------------------------- diff --git a/modules/platforms/nodejs/lib/CacheClient.js b/modules/platforms/nodejs/lib/CacheClient.js new file mode 100644 index 0000000..b76471f --- /dev/null +++ b/modules/platforms/nodejs/lib/CacheClient.js @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const BinaryUtils = require('./internal/BinaryUtils'); +const BinaryReader = require('./internal/BinaryReader'); +const BinaryWriter = require('./internal/BinaryWriter'); +const ArgumentChecker = require('./internal/ArgumentChecker'); +const SqlQuery = require('./Query').SqlQuery; +const SqlFieldsQuery = require('./Query').SqlFieldsQuery; +const ScanQuery = require('./Query').ScanQuery; + +/** + * Peek modes + * @typedef CacheClient.PEEK_MODE + * @enum + * @readonly + * @property ALL 0 + * @property NEAR 1 + * @property PRIMARY 2 + * @property BACKUP 3 + */ +const PEEK_MODE = Object.freeze({ + ALL : 0, + NEAR : 1, + PRIMARY : 2, + BACKUP : 3 +}); + +/** + * Class representing and providing access to Ignite cache. + * + * The class has no public constructor. An instance of this class should be obtained + * via the methods of {@link IgniteClient} objects. + * One instance of this class provides access to one Ignite cache which is specified + * during the instance obtaining and cannot be changed after that. + * + * There are three groups of methods in the cache client: + * - methods to configure the cache client + * - methods to operate with the cache using Key-Value Queries + * - methods to operate with the cache using SQL and Scan Queries + * + * @hideconstructor + */ +class CacheClient { + + static get PEEK_MODE() { + return PEEK_MODE; + } + + /* Methods to configure the cache client */ + + /** + * Specifies a type of the cache key. + * + * The cache client assumes that keys in all further operations with the cache + * will have the specified type. + * Eg. the cache client will convert keys provided as input parameters of the methods + * to the specified object type before sending them to a server. + * + * After the cache client creation a type of the cache key is not specified (null). + * + * If the type is not specified then during operations the cache client + * will do automatic mapping between some of the JavaScript types and object types - + * according to the mapping table defined in the description of the {@link ObjectType} class. + * + * @param {ObjectType.PRIMITIVE_TYPE | CompositeType} type - type of the keys in the cache: + * - either a type code of primitive (simple) type + * - or an instance of class representing non-primitive (composite) type + * - or null (means the type is not specified). + * + * @return {CacheClient} - the same instance of the cache client. + * + * @throws {IgniteClientError} if error. + */ + setKeyType(type) { + BinaryUtils.checkObjectType(type, 'type'); + this._keyType = type; + return this; + } + + /** + * Specifies a type of the cache value. + * + * The cache client assumes that values in all further operations with the cache + * will have the specified type. + * Eg. the cache client will convert values provided as input parameters of the methods + * to the specified object type before sending them to a server. + * + * After the cache client creation a type of the cache value is not specified (null). + * + * If the type is not specified then during operations the cache client + * will do automatic mapping between some of the JavaScript types and object types - + * according to the mapping table defined in the description of the {@link ObjectType} class. + * + * @param {ObjectType.PRIMITIVE_TYPE | CompositeType} type - type of the values in the cache: + * - either a type code of primitive (simple) type + * - or an instance of class representing non-primitive (composite) type + * - or null (means the type is not specified). + * + * @return {CacheClient} - the same instance of the cache client. + * + * @throws {IgniteClientError} if error. + */ + setValueType(type) { + BinaryUtils.checkObjectType(type, 'type'); + this._valueType = type; + return this; + } + + /* Methods to operate with the cache using Key-Value Queries */ + + /** + * Retrieves a value associated with the specified key from the cache. + * + * @async + * + * @param {*} key - key. + * + * @return {Promise<*>} - value associated with the specified key, or null if it does not exist. + * + * @throws {IgniteClientError} if error. + */ + async get(key) { + return await this._writeKeyReadValueOp(BinaryUtils.OPERATION.CACHE_GET, key); + } + + /** + * Retrieves entries associated with the specified keys from the cache. + * + * @async + * + * @param {Array<*>} keys - keys. + * + * @return {Promise<Array<CacheEntry>>} - the retrieved entries (key-value pairs). + * Entries with the keys which do not exist in the cache are not included into the array. + * + * @throws {IgniteClientError} if error. + */ + async getAll(keys) { + ArgumentChecker.notEmpty(keys, 'keys'); + ArgumentChecker.hasType(keys, 'keys', false, Array); + let result = null; + await this._socket.send( + BinaryUtils.OPERATION.CACHE_GET_ALL, + async (payload) => { + this._writeCacheInfo(payload); + await this._writeKeys(payload, keys); + }, + async (payload) => { + const resultCount = payload.readInteger(); + result = new Array(resultCount); + for (let i = 0; i < resultCount; i++) { + result[i] = new CacheEntry( + await BinaryReader.readObject(payload, this._getKeyType()), + await BinaryReader.readObject(payload, this._getValueType())); + } + }); + return result; + } + + /** + * Associates the specified value with the specified key in the cache. + * + * Overwrites the previous value if the key exists in the cache, + * otherwise creates new entry (key-value pair). + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be associated with the specified key. + * + * @throws {IgniteClientError} if error. + */ + async put(key, value) { + await this._writeKeyValueOp(BinaryUtils.OPERATION.CACHE_PUT, key, value); + } + + /** + * Associates the specified values with the specified keys in the cache. + * + * Overwrites the previous value if a key exists in the cache, + * otherwise creates new entry (key-value pair). + * + * @async + * + * @param {Array<CacheEntry>} entries - entries (key-value pairs) to be put into the cache. + * + * @throws {IgniteClientError} if error. + */ + async putAll(entries) { + ArgumentChecker.notEmpty(entries, 'entries'); + ArgumentChecker.hasType(entries, 'entries', true, CacheEntry); + await this._socket.send( + BinaryUtils.OPERATION.CACHE_PUT_ALL, + async (payload) => { + this._writeCacheInfo(payload); + payload.writeInteger(entries.length); + for (let entry of entries) { + await this._writeKeyValue(payload, entry.getKey(), entry.getValue()); + } + }); + } + + /** + * Checks if the specified key exists in the cache. + * + * @async + * + * @param {*} key - key to check. + * + * @return {Promise<boolean>} - true if the key exists, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + async containsKey(key) { + return await this._writeKeyReadBooleanOp(BinaryUtils.OPERATION.CACHE_CONTAINS_KEY, key); + } + + /** + * Checks if all the specified keys exist in the cache. + * + * @async + * + * @param {Array<*>} keys - keys to check. + * + * @return {Promise<boolean>} - true if all the keys exist, + * false if at least one of the keys does not exist in the cache. + * + * @throws {IgniteClientError} if error. + */ + async containsKeys(keys) { + return await this._writeKeysReadBooleanOp(BinaryUtils.OPERATION.CACHE_CONTAINS_KEYS, keys); + } + + /** + * Associates the specified value with the specified key in the cache + * and returns the previous associated value, if any. + * + * Overwrites the previous value if the key exists in the cache, + * otherwise creates new entry (key-value pair). + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be associated with the specified key. + * + * @return {Promise<*>} - the previous value associated with the specified key, or null if it did not exist. + * + * @throws {IgniteClientError} if error. + */ + async getAndPut(key, value) { + return await this._writeKeyValueReadValueOp(BinaryUtils.OPERATION.CACHE_GET_AND_PUT, key, value); + } + + /** + * Associates the specified value with the specified key in the cache + * and returns the previous associated value, if the key exists in the cache. + * Otherwise does nothing and returns null. + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be associated with the specified key. + * + * @return {Promise<*>} - the previous value associated with the specified key, or null if it did not exist. + * + * @throws {IgniteClientError} if error. + */ + async getAndReplace(key, value) { + return await this._writeKeyValueReadValueOp(BinaryUtils.OPERATION.CACHE_GET_AND_REPLACE, key, value); + } + + /** + * Removes the cache entry with the specified key + * and returns the last associated value, if any. + * + * @async + * + * @param {*} key - key of the entry to be removed. + * + * @return {Promise<*>} - the last value associated with the specified key, or null if it did not exist. + * + * @throws {IgniteClientError} if error. + */ + async getAndRemove(key) { + return await this._writeKeyReadValueOp(BinaryUtils.OPERATION.CACHE_GET_AND_REMOVE, key); + } + + /** + * Creates new entry (key-value pair) if the specified key does not exist in the cache. + * Otherwise does nothing. + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be associated with the specified key. + * + * @return {Promise<boolean>} - true if the operation has been done, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + async putIfAbsent(key, value) { + return await this._writeKeyValueReadBooleanOp(BinaryUtils.OPERATION.CACHE_PUT_IF_ABSENT, key, value); + } + + /** + * Creates new entry (key-value pair) if the specified key does not exist in the cache. + * Otherwise returns the current value associated with the existing key. + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be associated with the specified key. + * + * @return {Promise<*>} - the current value associated with the key if it already exists in the cache, + * null if the new entry is created. + * + * @throws {IgniteClientError} if error. + */ + async getAndPutIfAbsent(key, value) { + return await this._writeKeyValueReadValueOp(BinaryUtils.OPERATION.CACHE_GET_AND_PUT_IF_ABSENT, key, value); + } + + /** + * Associates the specified value with the specified key, if the key exists in the cache. + * Otherwise does nothing. + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be associated with the specified key. + * + * @return {Promise<boolean>} - true if the operation has been done, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + async replace(key, value) { + return await this._writeKeyValueReadBooleanOp(BinaryUtils.OPERATION.CACHE_REPLACE, key, value); + } + + /** + * Associates the new value with the specified key, if the key exists in the cache + * and the current value equals to the provided one. + * Otherwise does nothing. + * + * @async + * + * @param {*} key - key. + * @param {*} value - value to be compared with the current value associated with the specified key. + * @param {*} newValue - new value to be associated with the specified key. + * + * @return {Promise<boolean>} - true if the operation has been done, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + async replaceIfEquals(key, value, newValue) { + ArgumentChecker.notNull(key, 'key'); + ArgumentChecker.notNull(value, 'value'); + ArgumentChecker.notNull(newValue, 'newValue'); + let result; + await this._socket.send( + BinaryUtils.OPERATION.CACHE_REPLACE_IF_EQUALS, + async (payload) => { + this._writeCacheInfo(payload); + await this._writeKeyValue(payload, key, value); + await BinaryWriter.writeObject(payload, newValue, this._getValueType()); + }, + async (payload) => { + result = payload.readBoolean(); + }); + return result; + } + + /** + * Removes all entries from the cache, without notifying listeners and cache writers. + * + * @async + * + * @throws {IgniteClientError} if error. + */ + async clear() { + await this._socket.send( + BinaryUtils.OPERATION.CACHE_CLEAR, + async (payload) => { + this._writeCacheInfo(payload); + }); + } + + /** + * Removes entry with the specified key from the cache, without notifying listeners and cache writers. + * + * @async + * + * @param {*} key - key to be removed. + * + * @throws {IgniteClientError} if error. + */ + async clearKey(key) { + await this._writeKeyOp(BinaryUtils.OPERATION.CACHE_CLEAR_KEY, key); + } + + /** + * Removes entries with the specified keys from the cache, without notifying listeners and cache writers. + * + * @async + * + * @param {Array<*>} keys - keys to be removed. + * + * @throws {IgniteClientError} if error. + */ + async clearKeys(keys) { + await this._writeKeysOp(BinaryUtils.OPERATION.CACHE_CLEAR_KEYS, keys); + } + + /** + * Removes entry with the specified key from the cache, notifying listeners and cache writers. + * + * @async + * + * @param {*} key - key to be removed. + * + * @return {Promise<boolean>} - true if the operation has been done, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + async removeKey(key) { + return await this._writeKeyReadBooleanOp(BinaryUtils.OPERATION.CACHE_REMOVE_KEY, key); + } + + /** + * Removes entry with the specified key from the cache, if the current value equals to the provided one. + * Notifies listeners and cache writers. + * + * @async + * + * @param {*} key - key to be removed. + * @param {*} value - value to be compared with the current value associated with the specified key. + * + * @return {Promise<boolean>} - true if the operation has been done, false otherwise. + * + * @throws {IgniteClientError} if error. + */ + async removeIfEquals(key, value) { + return await this._writeKeyValueReadBooleanOp(BinaryUtils.OPERATION.CACHE_REMOVE_IF_EQUALS, key, value); + } + + /** + * Removes entries with the specified keys from the cache, notifying listeners and cache writers. + * + * @async + * + * @param {Array<*>} keys - keys to be removed. + * + * @throws {IgniteClientError} if error. + */ + async removeKeys(keys) { + await this._writeKeysOp(BinaryUtils.OPERATION.CACHE_REMOVE_KEYS, keys); + } + + /** + * Removes all entries from the cache, notifying listeners and cache writers. + * + * @async + * + * @throws {IgniteClientError} if error. + */ + async removeAll() { + await this._socket.send( + BinaryUtils.OPERATION.CACHE_REMOVE_ALL, + async (payload) => { + this._writeCacheInfo(payload); + }); + } + + /** + * Returns the number of the entries in the cache. + * + * @async + * + * @param {...CacheClient.PEEK_MODE} [peekModes] - peek modes. + * + * @return {Promise<number>} - the number of the entries in the cache. + * + * @throws {IgniteClientError} if error. + */ + async getSize(...peekModes) { + ArgumentChecker.hasValueFrom(peekModes, 'peekModes', true, CacheClient.PEEK_MODE); + let result; + await this._socket.send( + BinaryUtils.OPERATION.CACHE_GET_SIZE, + async (payload) => { + this._writeCacheInfo(payload); + payload.writeInteger(peekModes.length); + for (let mode of peekModes) { + payload.writeByte(mode); + } + }, + async (payload) => { + result = payload.readLong().toNumber(); + }); + return result; + } + + /* Methods to operate with the cache using SQL and Scan Queries */ + + /** + * Starts an SQL or Scan query operation. + * + * @async + * + * @param {SqlQuery | SqlFieldsQuery | ScanQuery} query - query to be executed. + * + * @return {Promise<Cursor>} - cursor to obtain the results of the query operation: + * - {@link SqlFieldsCursor} in case of {@link SqlFieldsQuery} query + * - {@link Cursor} in case of other types of query + * + * @throws {IgniteClientError} if error. + */ + async query(query) { + ArgumentChecker.notNull(query, 'query'); + ArgumentChecker.hasType(query, 'query', false, SqlQuery, SqlFieldsQuery, ScanQuery); + + let value = null; + await this._socket.send( + query._operation, + async (payload) => { + this._writeCacheInfo(payload); + await query._write(payload); + }, + async (payload) => { + value = await query._getCursor(this._socket, payload, this._keyType, this._valueType); + }); + return value; + } + + /** Private methods */ + + /** + * @ignore + */ + constructor(name, config, socket) { + this._name = name; + this._cacheId = CacheClient._calculateId(this._name); + this._config = config; + this._keyType = null; + this._valueType = null; + this._socket = socket; + } + + /** + * @ignore + */ + static _calculateId(name) { + return BinaryUtils.hashCode(name); + } + + /** + * @ignore + */ + _writeCacheInfo(payload) { + payload.writeInteger(this._cacheId); + payload.writeByte(0); + } + + /** + * @ignore + */ + async _writeKeyValue(payload, key, value) { + await BinaryWriter.writeObject(payload, key, this._getKeyType()); + await BinaryWriter.writeObject(payload, value, this._getValueType()); + } + + /** + * @ignore + */ + async _writeKeys(payload, keys) { + payload.writeInteger(keys.length); + for (let key of keys) { + await BinaryWriter.writeObject(payload, key, this._getKeyType()); + } + } + + /** + * @ignore + */ + _getKeyType() { + return this._keyType; + } + + /** + * @ignore + */ + _getValueType() { + return this._valueType; + } + + /** + * @ignore + */ + async _writeKeyValueOp(operation, key, value, payloadReader = null) { + ArgumentChecker.notNull(key, 'key'); + ArgumentChecker.notNull(value, 'value'); + await this._socket.send( + operation, + async (payload) => { + this._writeCacheInfo(payload); + await this._writeKeyValue(payload, key, value); + }, + payloadReader); + } + + /** + * @ignore + */ + async _writeKeyValueReadValueOp(operation, key, value) { + let result = null; + await this._writeKeyValueOp( + operation, key, value, + async (payload) => { + result = await BinaryReader.readObject(payload, this._getValueType()); + }); + return result; + } + + /** + * @ignore + */ + async _writeKeyValueReadBooleanOp(operation, key, value) { + let result = false; + await this._writeKeyValueOp( + operation, key, value, + async (payload) => { + result = payload.readBoolean(); + }); + return result; + } + + /** + * @ignore + */ + async _writeKeyOp(operation, key, payloadReader = null) { + ArgumentChecker.notNull(key, 'key'); + await this._socket.send( + operation, + async (payload) => { + this._writeCacheInfo(payload); + await BinaryWriter.writeObject(payload, key, this._getKeyType()); + }, + payloadReader); + } + + /** + * @ignore + */ + async _writeKeyReadValueOp(operation, key) { + let value = null; + await this._writeKeyOp( + operation, key, + async (payload) => { + value = await BinaryReader.readObject(payload, this._getValueType()); + }); + return value; + } + + /** + * @ignore + */ + async _writeKeyReadBooleanOp(operation, key) { + let result = false; + await this._writeKeyOp( + operation, key, + async (payload) => { + result = payload.readBoolean(); + }); + return result; + } + + /** + * @ignore + */ + async _writeKeysOp(operation, keys, payloadReader = null) { + ArgumentChecker.notEmpty(keys, 'keys'); + ArgumentChecker.hasType(keys, 'keys', false, Array); + await this._socket.send( + operation, + async (payload) => { + this._writeCacheInfo(payload); + await this._writeKeys(payload, keys); + }, + payloadReader); + } + + /** + * @ignore + */ + async _writeKeysReadBooleanOp(operation, keys) { + let result = false; + await this._writeKeysOp( + operation, keys, + async (payload) => { + result = payload.readBoolean(); + }); + return result; + } +} + +/** + * A cache entry (key-value pair). + */ +class CacheEntry { + + /** + * Public constructor. + * + * @param {*} key - key corresponding to this entry. + * @param {*} value - value associated with the key. + * + * @return {CacheEntry} - new CacheEntry instance + */ + constructor(key, value) { + this._key = key; + this._value = value; + } + + /** + * Returns the key corresponding to this entry. + * + * @return {*} - the key corresponding to this entry. + */ + getKey() { + return this._key; + } + + /** + * Returns the value corresponding to this entry. + * + * @return {*} - the value corresponding to this entry. + */ + getValue() { + return this._value; + } +} + +module.exports = CacheClient; +module.exports.CacheEntry = CacheEntry;