http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_reader_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_reader_impl.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_reader_impl.cpp new file mode 100644 index 0000000..e189273 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_reader_impl.cpp @@ -0,0 +1,760 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/interop/interop.h" +#include "ignite/impl/binary/binary_common.h" +#include "ignite/impl/binary/binary_id_resolver.h" +#include "ignite/impl/binary/binary_reader_impl.h" +#include "ignite/impl/binary/binary_utils.h" +#include "ignite/binary/binary_type.h" +#include "ignite/ignite_error.h" +#include "ignite/impl/interop/interop_stream_position_guard.h" + +using namespace ignite::impl::interop; +using namespace ignite::impl::binary; +using namespace ignite::binary; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryReaderImpl::BinaryReaderImpl(InteropInputStream* stream, BinaryIdResolver* idRslvr, + int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff, + int32_t footerBegin, int32_t footerEnd, BinaryOffsetType schemaType) : + stream(stream), idRslvr(idRslvr), pos(pos), usrType(usrType), typeId(typeId), + hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false), elemIdGen(0), elemId(0), + elemCnt(-1), elemRead(0), footerBegin(footerBegin), footerEnd(footerEnd), schemaType(schemaType) + { + // No-op. + } + + BinaryReaderImpl::BinaryReaderImpl(InteropInputStream* stream) : + stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0), len(0), + rawOff(0), rawMode(true), elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0), footerBegin(-1), + footerEnd(-1), schemaType(OFFSET_TYPE_4_BYTE) + { + // No-op. + } + + int8_t BinaryReaderImpl::ReadInt8() + { + return ReadRaw<int8_t>(BinaryUtils::ReadInt8); + } + + int32_t BinaryReaderImpl::ReadInt8Array(int8_t* res, const int32_t len) + { + return ReadRawArray<int8_t>(res, len, BinaryUtils::ReadInt8Array, IGNITE_TYPE_ARRAY_BYTE); + } + + int8_t BinaryReaderImpl::ReadInt8(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadInt8, IGNITE_TYPE_BYTE, static_cast<int8_t>(0)); + } + + int32_t BinaryReaderImpl::ReadInt8Array(const char* fieldName, int8_t* res, const int32_t len) + { + return ReadArray<int8_t>(fieldName, res, len,BinaryUtils::ReadInt8Array, IGNITE_TYPE_ARRAY_BYTE); + } + + bool BinaryReaderImpl::ReadBool() + { + return ReadRaw<bool>(BinaryUtils::ReadBool); + } + + int32_t BinaryReaderImpl::ReadBoolArray(bool* res, const int32_t len) + { + return ReadRawArray<bool>(res, len, BinaryUtils::ReadBoolArray, IGNITE_TYPE_ARRAY_BOOL); + } + + bool BinaryReaderImpl::ReadBool(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadBool, IGNITE_TYPE_BOOL, static_cast<bool>(0)); + } + + int32_t BinaryReaderImpl::ReadBoolArray(const char* fieldName, bool* res, const int32_t len) + { + return ReadArray<bool>(fieldName, res, len,BinaryUtils::ReadBoolArray, IGNITE_TYPE_ARRAY_BOOL); + } + + int16_t BinaryReaderImpl::ReadInt16() + { + return ReadRaw<int16_t>(BinaryUtils::ReadInt16); + } + + int32_t BinaryReaderImpl::ReadInt16Array(int16_t* res, const int32_t len) + { + return ReadRawArray<int16_t>(res, len, BinaryUtils::ReadInt16Array, IGNITE_TYPE_ARRAY_SHORT); + } + + int16_t BinaryReaderImpl::ReadInt16(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadInt16, IGNITE_TYPE_SHORT, static_cast<int16_t>(0)); + } + + int32_t BinaryReaderImpl::ReadInt16Array(const char* fieldName, int16_t* res, const int32_t len) + { + return ReadArray<int16_t>(fieldName, res, len, BinaryUtils::ReadInt16Array, IGNITE_TYPE_ARRAY_SHORT); + } + + uint16_t BinaryReaderImpl::ReadUInt16() + { + return ReadRaw<uint16_t>(BinaryUtils::ReadUInt16); + } + + int32_t BinaryReaderImpl::ReadUInt16Array(uint16_t* res, const int32_t len) + { + return ReadRawArray<uint16_t>(res, len, BinaryUtils::ReadUInt16Array, IGNITE_TYPE_ARRAY_CHAR); + } + + uint16_t BinaryReaderImpl::ReadUInt16(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadUInt16, IGNITE_TYPE_CHAR, static_cast<uint16_t>(0)); + } + + int32_t BinaryReaderImpl::ReadUInt16Array(const char* fieldName, uint16_t* res, const int32_t len) + { + return ReadArray<uint16_t>(fieldName, res, len,BinaryUtils::ReadUInt16Array, IGNITE_TYPE_ARRAY_CHAR); + } + + int32_t BinaryReaderImpl::ReadInt32() + { + return ReadRaw<int32_t>(BinaryUtils::ReadInt32); + } + + int32_t BinaryReaderImpl::ReadInt32Array(int32_t* res, const int32_t len) + { + return ReadRawArray<int32_t>(res, len, BinaryUtils::ReadInt32Array, IGNITE_TYPE_ARRAY_INT); + } + + int32_t BinaryReaderImpl::ReadInt32(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadInt32, IGNITE_TYPE_INT, static_cast<int32_t>(0)); + } + + int32_t BinaryReaderImpl::ReadInt32Array(const char* fieldName, int32_t* res, const int32_t len) + { + return ReadArray<int32_t>(fieldName, res, len,BinaryUtils::ReadInt32Array, IGNITE_TYPE_ARRAY_INT); + } + + int64_t BinaryReaderImpl::ReadInt64() + { + return ReadRaw<int64_t>(BinaryUtils::ReadInt64); + } + + int32_t BinaryReaderImpl::ReadInt64Array(int64_t* res, const int32_t len) + { + return ReadRawArray<int64_t>(res, len, BinaryUtils::ReadInt64Array, IGNITE_TYPE_ARRAY_LONG); + } + + int64_t BinaryReaderImpl::ReadInt64(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadInt64, IGNITE_TYPE_LONG, static_cast<int64_t>(0)); + } + + int32_t BinaryReaderImpl::ReadInt64Array(const char* fieldName, int64_t* res, const int32_t len) + { + return ReadArray<int64_t>(fieldName, res, len,BinaryUtils::ReadInt64Array, IGNITE_TYPE_ARRAY_LONG); + } + + float BinaryReaderImpl::ReadFloat() + { + return ReadRaw<float>(BinaryUtils::ReadFloat); + } + + int32_t BinaryReaderImpl::ReadFloatArray(float* res, const int32_t len) + { + return ReadRawArray<float>(res, len, BinaryUtils::ReadFloatArray, IGNITE_TYPE_ARRAY_FLOAT); + } + + float BinaryReaderImpl::ReadFloat(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadFloat, IGNITE_TYPE_FLOAT, static_cast<float>(0)); + } + + int32_t BinaryReaderImpl::ReadFloatArray(const char* fieldName, float* res, const int32_t len) + { + return ReadArray<float>(fieldName, res, len,BinaryUtils::ReadFloatArray, IGNITE_TYPE_ARRAY_FLOAT); + } + + double BinaryReaderImpl::ReadDouble() + { + return ReadRaw<double>(BinaryUtils::ReadDouble); + } + + int32_t BinaryReaderImpl::ReadDoubleArray(double* res, const int32_t len) + { + return ReadRawArray<double>(res, len, BinaryUtils::ReadDoubleArray, IGNITE_TYPE_ARRAY_DOUBLE); + } + + double BinaryReaderImpl::ReadDouble(const char* fieldName) + { + return Read(fieldName, BinaryUtils::ReadDouble, IGNITE_TYPE_DOUBLE, static_cast<double>(0)); + } + + int32_t BinaryReaderImpl::ReadDoubleArray(const char* fieldName, double* res, const int32_t len) + { + return ReadArray<double>(fieldName, res, len,BinaryUtils::ReadDoubleArray, IGNITE_TYPE_ARRAY_DOUBLE); + } + + Guid BinaryReaderImpl::ReadGuid() + { + CheckRawMode(true); + CheckSingleMode(true); + + return ReadNullable(stream, BinaryUtils::ReadGuid, IGNITE_TYPE_UUID); + } + + int32_t BinaryReaderImpl::ReadGuidArray(Guid* res, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + return ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, IGNITE_TYPE_ARRAY_UUID); + } + + Guid BinaryReaderImpl::ReadGuid(const char* fieldName) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + return Guid(); + + stream->Position(fieldPos); + + return ReadNullable(stream, BinaryUtils::ReadGuid, IGNITE_TYPE_UUID); + } + + int32_t BinaryReaderImpl::ReadGuidArray(const char* fieldName, Guid* res, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + return -1; + + stream->Position(fieldPos); + + int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, IGNITE_TYPE_ARRAY_UUID); + + return realLen; + } + + void BinaryReaderImpl::ReadGuidArrayInternal(InteropInputStream* stream, Guid* res, const int32_t len) + { + for (int i = 0; i < len; i++) + *(res + i) = ReadNullable<Guid>(stream, BinaryUtils::ReadGuid, IGNITE_TYPE_UUID); + } + + int32_t BinaryReaderImpl::ReadString(char* res, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + return ReadStringInternal(res, len); + } + + int32_t BinaryReaderImpl::ReadString(const char* fieldName, char* res, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + return -1; + + stream->Position(fieldPos); + + int32_t realLen = ReadStringInternal(res, len); + + return realLen; + } + + int32_t BinaryReaderImpl::ReadStringArray(int32_t* size) + { + return StartContainerSession(true, IGNITE_TYPE_ARRAY_STRING, size); + } + + int32_t BinaryReaderImpl::ReadStringArray(const char* fieldName, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + { + *size = -1; + + return ++elemIdGen; + } + + stream->Position(fieldPos); + + return StartContainerSession(false, IGNITE_TYPE_ARRAY_STRING, size); + } + + int32_t BinaryReaderImpl::ReadStringElement(int32_t id, char* res, const int32_t len) + { + CheckSession(id); + + int32_t posBefore = stream->Position(); + + int32_t realLen = ReadStringInternal(res, len); + + int32_t posAfter = stream->Position(); + + if (posAfter > posBefore && ++elemRead == elemCnt) { + elemId = 0; + elemCnt = -1; + elemRead = 0; + } + + return realLen; + } + + int32_t BinaryReaderImpl::ReadStringInternal(char* res, const int32_t len) + { + int8_t hdr = stream->ReadInt8(); + + if (hdr == IGNITE_TYPE_STRING) { + int32_t realLen = stream->ReadInt32(); + + if (res && len >= realLen) { + for (int i = 0; i < realLen; i++) + *(res + i) = static_cast<char>(stream->ReadInt8()); + + if (len > realLen) + *(res + realLen) = 0; // Set NULL terminator if possible. + } + else + stream->Position(stream->Position() - 4 - 1); + + return realLen; + } + else if (hdr != IGNITE_HDR_NULL) + ThrowOnInvalidHeader(IGNITE_TYPE_ARRAY, hdr); + + return -1; + } + + int32_t BinaryReaderImpl::ReadArray(int32_t* size) + { + return StartContainerSession(true, IGNITE_TYPE_ARRAY, size); + } + + int32_t BinaryReaderImpl::ReadArray(const char* fieldName, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + { + *size = -1; + + return ++elemIdGen; + } + + stream->Position(fieldPos); + + return StartContainerSession(false, IGNITE_TYPE_ARRAY, size); + } + + int32_t BinaryReaderImpl::ReadCollection(CollectionType* typ, int32_t* size) + { + int32_t id = StartContainerSession(true, IGNITE_TYPE_COLLECTION, size); + + if (*size == -1) + *typ = IGNITE_COLLECTION_UNDEFINED; + else + *typ = static_cast<CollectionType>(stream->ReadInt8()); + + return id; + } + + int32_t BinaryReaderImpl::ReadCollection(const char* fieldName, CollectionType* typ, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + { + *typ = IGNITE_COLLECTION_UNDEFINED; + *size = -1; + + return ++elemIdGen; + } + + stream->Position(fieldPos); + + int32_t id = StartContainerSession(false, IGNITE_TYPE_COLLECTION, size); + + if (*size == -1) + *typ = IGNITE_COLLECTION_UNDEFINED; + else + *typ = static_cast<CollectionType>(stream->ReadInt8()); + + return id; + } + + int32_t BinaryReaderImpl::ReadMap(MapType* typ, int32_t* size) + { + int32_t id = StartContainerSession(true, IGNITE_TYPE_MAP, size); + + if (*size == -1) + *typ = IGNITE_MAP_UNDEFINED; + else + *typ = static_cast<MapType>(stream->ReadInt8()); + + return id; + } + + int32_t BinaryReaderImpl::ReadMap(const char* fieldName, MapType* typ, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + { + *typ = IGNITE_MAP_UNDEFINED; + *size = -1; + + return ++elemIdGen; + } + + stream->Position(fieldPos); + + int32_t id = StartContainerSession(false, IGNITE_TYPE_MAP, size); + + if (*size == -1) + *typ = IGNITE_MAP_UNDEFINED; + else + *typ = static_cast<MapType>(stream->ReadInt8()); + + return id; + } + + CollectionType BinaryReaderImpl::ReadCollectionTypeUnprotected() + { + int32_t size = ReadCollectionSizeUnprotected(); + if (size == -1) + return IGNITE_COLLECTION_UNDEFINED; + + CollectionType typ = static_cast<CollectionType>(stream->ReadInt8()); + + return typ; + } + + CollectionType BinaryReaderImpl::ReadCollectionType() + { + InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream); + + return ReadCollectionTypeUnprotected(); + } + + CollectionType BinaryReaderImpl::ReadCollectionType(const char* fieldName) + { + CheckRawMode(false); + CheckSingleMode(true); + + InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + return IGNITE_COLLECTION_UNDEFINED; + + stream->Position(fieldPos); + + return ReadCollectionTypeUnprotected(); + } + + int32_t BinaryReaderImpl::ReadCollectionSizeUnprotected() + { + int8_t hdr = stream->ReadInt8(); + + if (hdr != IGNITE_TYPE_COLLECTION) + { + if (hdr != IGNITE_HDR_NULL) + ThrowOnInvalidHeader(IGNITE_TYPE_COLLECTION, hdr); + + return -1; + } + + int32_t size = stream->ReadInt32(); + + return size; + } + + int32_t BinaryReaderImpl::ReadCollectionSize() + { + InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream); + + return ReadCollectionSizeUnprotected(); + } + + int32_t BinaryReaderImpl::ReadCollectionSize(const char* fieldName) + { + CheckRawMode(false); + CheckSingleMode(true); + + InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldPos = FindField(fieldId); + + if (fieldPos <= 0) + return -1; + + stream->Position(fieldPos); + + return ReadCollectionSizeUnprotected(); + } + + bool BinaryReaderImpl::HasNextElement(int32_t id) const + { + return elemId == id && elemRead < elemCnt; + } + + void BinaryReaderImpl::SetRawMode() + { + CheckRawMode(false); + CheckSingleMode(true); + + stream->Position(pos + rawOff); + rawMode = true; + } + + template <> + int8_t BinaryReaderImpl::ReadTopObject<int8_t>() + { + return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8, static_cast<int8_t>(0)); + } + + template <> + bool BinaryReaderImpl::ReadTopObject<bool>() + { + return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool, static_cast<bool>(0)); + } + + template <> + int16_t BinaryReaderImpl::ReadTopObject<int16_t>() + { + return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16, static_cast<int16_t>(0)); + } + + template <> + uint16_t BinaryReaderImpl::ReadTopObject<uint16_t>() + { + return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16, static_cast<uint16_t>(0)); + } + + template <> + int32_t BinaryReaderImpl::ReadTopObject<int32_t>() + { + return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32, static_cast<int32_t>(0)); + } + + template <> + int64_t BinaryReaderImpl::ReadTopObject<int64_t>() + { + return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64, static_cast<int64_t>(0)); + } + + template <> + float BinaryReaderImpl::ReadTopObject<float>() + { + return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat, static_cast<float>(0)); + } + + template <> + double BinaryReaderImpl::ReadTopObject<double>() + { + return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble, static_cast<double>(0)); + } + + template <> + Guid BinaryReaderImpl::ReadTopObject<Guid>() + { + int8_t typeId = stream->ReadInt8(); + + if (typeId == IGNITE_TYPE_UUID) + return BinaryUtils::ReadGuid(stream); + else if (typeId == IGNITE_HDR_NULL) + return Guid(); + else { + int32_t pos = stream->Position() - 1; + + IGNITE_ERROR_FORMATTED_3(IgniteError::IGNITE_ERR_BINARY, "Invalid header", "position", pos, "expected", IGNITE_TYPE_UUID, "actual", typeId) + } + } + + InteropInputStream* BinaryReaderImpl::GetStream() + { + return stream; + } + + int32_t BinaryReaderImpl::FindField(const int32_t fieldId) + { + InteropStreamPositionGuard<InteropInputStream> streamGuard(*stream); + + stream->Position(footerBegin); + + switch (schemaType) + { + case OFFSET_TYPE_1_BYTE: + { + for (int32_t schemaPos = footerBegin; schemaPos < footerEnd; schemaPos += 5) + { + int32_t currentFieldId = stream->ReadInt32(schemaPos); + + if (fieldId == currentFieldId) + return static_cast<uint8_t>(stream->ReadInt8(schemaPos + 4)) + pos; + } + break; + } + + case OFFSET_TYPE_2_BYTE: + { + for (int32_t schemaPos = footerBegin; schemaPos < footerEnd; schemaPos += 6) + { + int32_t currentFieldId = stream->ReadInt32(schemaPos); + + if (fieldId == currentFieldId) + return static_cast<uint16_t>(stream->ReadInt16(schemaPos + 4)) + pos; + } + break; + } + + case OFFSET_TYPE_4_BYTE: + { + for (int32_t schemaPos = footerBegin; schemaPos < footerEnd; schemaPos += 8) + { + int32_t currentFieldId = stream->ReadInt32(schemaPos); + + if (fieldId == currentFieldId) + return stream->ReadInt32(schemaPos + 4) + pos; + } + break; + } + } + + return -1; + } + + void BinaryReaderImpl::CheckRawMode(bool expected) const + { + if (expected && !rawMode) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation can be performed only in raw mode.") + } + else if (!expected && rawMode) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation cannot be performed in raw mode.") + } + } + + void BinaryReaderImpl::CheckSingleMode(bool expected) const + { + if (expected && elemId != 0) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation cannot be performed when container is being read."); + } + else if (!expected && elemId == 0) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation can be performed only when container is being read."); + } + } + + int32_t BinaryReaderImpl::StartContainerSession(bool expRawMode, int8_t expHdr, int32_t* size) + { + CheckRawMode(expRawMode); + CheckSingleMode(true); + + int8_t hdr = stream->ReadInt8(); + + if (hdr == expHdr) + { + int32_t cnt = stream->ReadInt32(); + + if (cnt != 0) + { + elemId = ++elemIdGen; + elemCnt = cnt; + elemRead = 0; + + *size = cnt; + + return elemId; + } + else + { + *size = 0; + + return ++elemIdGen; + } + } + else if (hdr == IGNITE_HDR_NULL) { + *size = -1; + + return ++elemIdGen; + } + else { + ThrowOnInvalidHeader(expHdr, hdr); + + return 0; + } + } + + void BinaryReaderImpl::CheckSession(int32_t expSes) const + { + if (elemId != expSes) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Containter read session has been finished or is not started yet."); + } + } + + void BinaryReaderImpl::ThrowOnInvalidHeader(int32_t pos, int8_t expHdr, int8_t hdr) + { + IGNITE_ERROR_FORMATTED_3(IgniteError::IGNITE_ERR_BINARY, "Invalid header", "position", pos, "expected", expHdr, "actual", hdr) + } + + void BinaryReaderImpl::ThrowOnInvalidHeader(int8_t expHdr, int8_t hdr) const + { + int32_t pos = stream->Position() - 1; + + ThrowOnInvalidHeader(pos, expHdr, hdr); + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_schema.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_schema.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_schema.cpp new file mode 100644 index 0000000..1596557 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_schema.cpp @@ -0,0 +1,135 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <cassert> + +#include "ignite/impl/binary/binary_schema.h" +#include "ignite/impl/binary/binary_writer_impl.h" + +/** FNV1 hash offset basis. */ +enum { FNV1_OFFSET_BASIS = 0x811C9DC5 }; + +/** FNV1 hash prime. */ +enum { FNV1_PRIME = 0x01000193 }; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinarySchema::BinarySchema(): id(0), fieldsInfo(new FieldContainer()) + { + // No-op. + } + + BinarySchema::~BinarySchema() + { + delete fieldsInfo; + } + + void BinarySchema::AddField(int32_t fieldId, int32_t offset) + { + if (!id) + { + // Initialize offset when the first field is written. + id = FNV1_OFFSET_BASIS; + } + + // Advance schema hash. + int32_t idAccumulator = id ^ (fieldId & 0xFF); + idAccumulator *= FNV1_PRIME; + idAccumulator ^= (fieldId >> 8) & 0xFF; + idAccumulator *= FNV1_PRIME; + idAccumulator ^= (fieldId >> 16) & 0xFF; + idAccumulator *= FNV1_PRIME; + idAccumulator ^= (fieldId >> 24) & 0xFF; + idAccumulator *= FNV1_PRIME; + + id = idAccumulator; + + BinarySchemaFieldInfo info = { fieldId, offset }; + fieldsInfo->push_back(info); + } + + void BinarySchema::Write(interop::InteropOutputStream& out) const + { + switch (GetType()) + { + case OFFSET_TYPE_1_BYTE: + { + for (FieldContainer::const_iterator i = fieldsInfo->begin(); i != fieldsInfo->end(); ++i) + { + out.WriteInt32(i->id); + out.WriteInt8(static_cast<int8_t>(i->offset)); + } + break; + } + + case OFFSET_TYPE_2_BYTE: + { + for (FieldContainer::const_iterator i = fieldsInfo->begin(); i != fieldsInfo->end(); ++i) + { + out.WriteInt32(i->id); + out.WriteInt16(static_cast<int16_t>(i->offset)); + } + break; + } + + case OFFSET_TYPE_4_BYTE: + { + for (FieldContainer::const_iterator i = fieldsInfo->begin(); i != fieldsInfo->end(); ++i) + { + out.WriteInt32(i->id); + out.WriteInt32(i->offset); + } + break; + } + + default: + { + assert(false); + break; + } + } + } + + bool BinarySchema::Empty() const + { + return fieldsInfo->empty(); + } + + void BinarySchema::Clear() + { + id = 0; + fieldsInfo->clear(); + } + + BinaryOffsetType BinarySchema::GetType() const + { + int32_t maxOffset = fieldsInfo->back().offset; + + if (maxOffset < 0x100) + return OFFSET_TYPE_1_BYTE; + else if (maxOffset < 0x10000) + return OFFSET_TYPE_2_BYTE; + + return OFFSET_TYPE_4_BYTE; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_type_handler.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_type_handler.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_type_handler.cpp new file mode 100644 index 0000000..5e70707 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_type_handler.cpp @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_handler.h" + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeHandler::BinaryTypeHandler(SPSnap snap) : snap(snap), fieldIds(NULL), fields(NULL) + { + // No-op. + } + + BinaryTypeHandler::~BinaryTypeHandler() + { + if (fieldIds) + delete fieldIds; + + if (fields) + delete fields; + } + + void BinaryTypeHandler::OnFieldWritten(int32_t fieldId, std::string fieldName, int32_t fieldTypeId) + { + if (!snap.Get() || !snap.Get()->ContainsFieldId(fieldId)) + { + if (!HasDifference()) + { + fieldIds = new std::set<int32_t>(); + fields = new std::map<std::string, int32_t>(); + } + + fieldIds->insert(fieldId); + (*fields)[fieldName] = fieldTypeId; + } + } + + SPSnap BinaryTypeHandler::GetSnapshot() + { + return snap; + } + + bool BinaryTypeHandler::HasDifference() + { + return fieldIds ? true : false; + } + + std::set<int32_t>* BinaryTypeHandler::GetFieldIds() + { + return fieldIds; + } + + std::map<std::string, int32_t>* BinaryTypeHandler::GetFields() + { + return fields; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_type_manager.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_type_manager.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_type_manager.cpp new file mode 100644 index 0000000..9bd115c --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_type_manager.cpp @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/common/concurrent.h> + +#include "ignite/impl/binary/binary_type_manager.h" + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeManager::BinaryTypeManager() : + snapshots(SharedPointer<std::map<int32_t, SPSnap>>(new std::map<int32_t, SPSnap>)), + pending(new std::vector<SPSnap>()), + cs(new CriticalSection()), + pendingVer(0), ver(0) + { + // No-op. + } + + BinaryTypeManager::~BinaryTypeManager() + { + pending->erase(pending->begin(), pending->end()); + + delete pending; + delete cs; + } + + SharedPointer<BinaryTypeHandler> BinaryTypeManager::GetHandler(int32_t typeId) + { + SharedPointer<std::map<int32_t, SPSnap>> snapshots0 = snapshots; + + SPSnap snapshot = (*snapshots0.Get())[typeId]; + + return SharedPointer<BinaryTypeHandler>(new BinaryTypeHandler(snapshot)); + } + + void BinaryTypeManager::SubmitHandler(std::string typeName, int32_t typeId, + BinaryTypeHandler* hnd) + { + Snap* snap = hnd->GetSnapshot().Get(); + + // If this is the very first write of a class or difference exists, + // we need to enqueue it for write. + if (!snap || hnd->HasDifference()) + { + std::set<int32_t>* newFieldIds = new std::set<int32_t>(); + std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>(); + + CopyFields(snap, newFieldIds, newFields); + + if (hnd->HasDifference()) + { + std::set<int32_t>* diffFieldIds = hnd->GetFieldIds(); + std::map<std::string, int32_t>* diffFields = hnd->GetFields(); + + for (std::set<int32_t>::iterator it = diffFieldIds->begin(); it != diffFieldIds->end(); ++it) + newFieldIds->insert(*it); + + for (std::map<std::string, int32_t>::iterator it = diffFields->begin(); it != diffFields->end(); ++it) + (*newFields)[it->first] = it->second; + } + + Snap* diffSnap = new Snap(typeName, typeId, newFieldIds, newFields); + + cs->Enter(); + + pending->push_back(SPSnap(diffSnap)); + + pendingVer++; + + cs->Leave(); + } + } + + int32_t BinaryTypeManager::GetVersion() + { + Memory::Fence(); + + return ver; + } + + bool BinaryTypeManager::IsUpdatedSince(int32_t oldVer) + { + Memory::Fence(); + + return pendingVer > oldVer; + } + + bool BinaryTypeManager::ProcessPendingUpdates(BinaryTypeUpdater* updater, IgniteError* err) + { + bool success = true; // Optimistically assume that all will be fine. + + cs->Enter(); + + for (std::vector<SPSnap>::iterator it = pending->begin(); it != pending->end(); ++it) + { + Snap* pendingSnap = (*it).Get(); + + if (updater->Update(pendingSnap, err)) + { + // Perform copy-on-write update of snapshot collection. + std::map<int32_t, SPSnap>* newSnapshots = new std::map<int32_t, SPSnap>(); + + bool snapshotFound = false; + + for (std::map<int32_t, SPSnap>::iterator snapIt = snapshots.Get()->begin(); + snapIt != snapshots.Get()->end(); ++snapIt) + { + int32_t curTypeId = snapIt->first; + Snap* curSnap = snapIt->second.Get(); + + if (pendingSnap->GetTypeId() == curTypeId) + { + // Have to create snapshot with updated fields. + std::set<int32_t>* newFieldIds = new std::set<int32_t>(); + std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>(); + + // Add old fields. + CopyFields(curSnap, newFieldIds, newFields); + + // Add new fields. + CopyFields(pendingSnap, newFieldIds, newFields); + + // Create new snapshot. + Snap* newSnap = new Snap(pendingSnap->GetTypeName(), pendingSnap->GetTypeId(), + newFieldIds, newFields); + + (*newSnapshots)[curTypeId] = SPSnap(newSnap); + + snapshotFound = true; + } + else + (*newSnapshots)[curTypeId] = snapIt->second; // Just transfer exising snapshot. + } + + // Handle situation when completely new snapshot is found. + if (!snapshotFound) + (*newSnapshots)[pendingSnap->GetTypeId()] = *it; + + snapshots = SharedPointer<std::map<int32_t, SPSnap>>(newSnapshots); + } + else + { + // Stop as we cannot move further. + success = false; + + break; + } + } + + if (success) + { + pending->erase(pending->begin(), pending->end()); + + ver = pendingVer; + } + + cs->Leave(); + + return success; + } + + void BinaryTypeManager::CopyFields(Snap* snap, std::set<int32_t>* fieldIds, + std::map<std::string, int32_t>* fields) + { + if (snap && snap->HasFields()) + { + std::set<int32_t>* snapFieldIds = snap->GetFieldIds(); + std::map<std::string, int32_t>* snapFields = snap->GetFields(); + + for (std::set<int32_t>::iterator oldIt = snapFieldIds->begin(); + oldIt != snapFieldIds->end(); ++oldIt) + fieldIds->insert(*oldIt); + + for (std::map<std::string, int32_t>::iterator newFieldsIt = snapFields->begin(); + newFieldsIt != snapFields->end(); ++newFieldsIt) + (*fields)[newFieldsIt->first] = newFieldsIt->second; + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_type_snapshot.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_type_snapshot.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_type_snapshot.cpp new file mode 100644 index 0000000..f34732f --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_type_snapshot.cpp @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_snapshot.h" + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeSnapshot::BinaryTypeSnapshot(std::string typeName, int32_t typeId, + std::set<int32_t>* fieldIds, std::map<std::string, int32_t>* fields) : + typeName(typeName), typeId(typeId), fieldIds(fieldIds), fields(fields) + { + // No-op. + } + + BinaryTypeSnapshot::~BinaryTypeSnapshot() + { + delete fieldIds; + delete fields; + } + + bool BinaryTypeSnapshot::ContainsFieldId(int32_t fieldId) + { + return fieldIds && fieldIds->count(fieldId) == 1; + } + + std::string BinaryTypeSnapshot::GetTypeName() + { + return typeName; + } + + int32_t BinaryTypeSnapshot::GetTypeId() + { + return typeId; + } + + bool BinaryTypeSnapshot::HasFields() + { + return !fieldIds->empty(); + } + + std::set<int32_t>* BinaryTypeSnapshot::GetFieldIds() + { + return fieldIds; + } + + std::map<std::string, int32_t>* BinaryTypeSnapshot::GetFields() + { + return fields; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_type_updater.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_type_updater.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_type_updater.cpp new file mode 100644 index 0000000..b3436e9 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_type_updater.cpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_updater.h" + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeUpdater::~BinaryTypeUpdater() + { + // No-op. + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_type_updater_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_type_updater_impl.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_type_updater_impl.cpp new file mode 100644 index 0000000..2e86ccd --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_type_updater_impl.cpp @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_updater_impl.h" +#include "ignite/impl/interop/interop_output_stream.h" +#include "ignite/impl/binary/binary_writer_impl.h" +#include "ignite/binary/binary_raw_writer.h" + +using namespace ignite::common::concurrent; +using namespace ignite::common::java; +using namespace ignite::impl; +using namespace ignite::impl::interop; +using namespace ignite::binary; + +namespace ignite +{ + namespace impl + { + namespace binary + { + /** Operation: Clear. */ + const int32_t OP_METADATA = -1; + + BinaryTypeUpdaterImpl::BinaryTypeUpdaterImpl(SharedPointer<IgniteEnvironment> env, + jobject javaRef) : env(env), javaRef(javaRef) + { + // No-op. + } + + BinaryTypeUpdaterImpl::~BinaryTypeUpdaterImpl() + { + // No-op. + } + + bool BinaryTypeUpdaterImpl::Update(Snap* snap, IgniteError* err) + { + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> mem = env.Get()->AllocateMemory(); + + InteropOutputStream out(mem.Get()); + BinaryWriterImpl writer(&out, NULL); + BinaryRawWriter rawWriter(&writer); + + // We always pass only one meta at a time in current implementation for simplicity. + rawWriter.WriteInt32(1); + + rawWriter.WriteInt32(snap->GetTypeId()); + rawWriter.WriteString(snap->GetTypeName()); + rawWriter.WriteString(NULL); // Affinity key is not supported for now. + + if (snap->HasFields()) + { + std::map<std::string, int32_t>* fields = snap->GetFields(); + + rawWriter.WriteInt32(static_cast<int32_t>(fields->size())); + + for (std::map<std::string, int32_t>::iterator it = fields->begin(); it != fields->end(); ++it) + { + rawWriter.WriteString(it->first); + rawWriter.WriteInt32(it->second); + } + } + else + rawWriter.WriteInt32(0); + + out.Synchronize(); + + long long res = env.Get()->Context()->TargetInStreamOutLong(javaRef, OP_METADATA, mem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return res == 1; + else + return false; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_utils.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_utils.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_utils.cpp new file mode 100644 index 0000000..8e26ea9 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_utils.cpp @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/interop/interop.h" +#include "ignite/impl/binary/binary_utils.h" + +using namespace ignite::impl::interop; +using namespace ignite::impl::binary; + +namespace ignite +{ + namespace impl + { + namespace binary + { + int8_t BinaryUtils::ReadInt8(InteropInputStream* stream) + { + return stream->ReadInt8(); + } + + void BinaryUtils::WriteInt8(InteropOutputStream* stream, int8_t val) + { + stream->WriteInt8(val); + } + + void BinaryUtils::ReadInt8Array(InteropInputStream* stream, int8_t* res, const int32_t len) + { + stream->ReadInt8Array(res, len); + } + + void BinaryUtils::WriteInt8Array(InteropOutputStream* stream, const int8_t* val, const int32_t len) + { + stream->WriteInt8Array(val, len); + } + + bool BinaryUtils::ReadBool(InteropInputStream* stream) + { + return stream->ReadBool(); + } + + void BinaryUtils::WriteBool(InteropOutputStream* stream, bool val) + { + stream->WriteBool(val); + } + + void BinaryUtils::ReadBoolArray(InteropInputStream* stream, bool* res, const int32_t len) + { + stream->ReadBoolArray(res, len); + } + + void BinaryUtils::WriteBoolArray(InteropOutputStream* stream, const bool* val, const int32_t len) + { + stream->WriteBoolArray(val, len); + } + + int16_t BinaryUtils::ReadInt16(InteropInputStream* stream) + { + return stream->ReadInt16(); + } + + void BinaryUtils::WriteInt16(InteropOutputStream* stream, int16_t val) + { + stream->WriteInt16(val); + } + + void BinaryUtils::ReadInt16Array(InteropInputStream* stream, int16_t* res, const int32_t len) + { + stream->ReadInt16Array(res, len); + } + + void BinaryUtils::WriteInt16Array(InteropOutputStream* stream, const int16_t* val, const int32_t len) + { + stream->WriteInt16Array(val, len); + } + + uint16_t BinaryUtils::ReadUInt16(InteropInputStream* stream) + { + return stream->ReadUInt16(); + } + + void BinaryUtils::WriteUInt16(InteropOutputStream* stream, uint16_t val) + { + stream->WriteUInt16(val); + } + + void BinaryUtils::ReadUInt16Array(InteropInputStream* stream, uint16_t* res, const int32_t len) + { + stream->ReadUInt16Array(res, len); + } + + void BinaryUtils::WriteUInt16Array(InteropOutputStream* stream, const uint16_t* val, const int32_t len) + { + stream->WriteUInt16Array(val, len); + } + + int32_t BinaryUtils::ReadInt32(InteropInputStream* stream) + { + return stream->ReadInt32(); + } + + void BinaryUtils::WriteInt32(InteropOutputStream* stream, int32_t val) + { + stream->WriteInt32(val); + } + + void BinaryUtils::ReadInt32Array(InteropInputStream* stream, int32_t* res, const int32_t len) + { + stream->ReadInt32Array(res, len); + } + + void BinaryUtils::WriteInt32Array(InteropOutputStream* stream, const int32_t* val, const int32_t len) + { + stream->WriteInt32Array(val, len); + } + + int64_t BinaryUtils::ReadInt64(InteropInputStream* stream) + { + return stream->ReadInt64(); + } + + void BinaryUtils::WriteInt64(InteropOutputStream* stream, int64_t val) + { + stream->WriteInt64(val); + } + + void BinaryUtils::ReadInt64Array(InteropInputStream* stream, int64_t* res, const int32_t len) + { + stream->ReadInt64Array(res, len); + } + + void BinaryUtils::WriteInt64Array(InteropOutputStream* stream, const int64_t* val, const int32_t len) + { + stream->WriteInt64Array(val, len); + } + + float BinaryUtils::ReadFloat(InteropInputStream* stream) + { + return stream->ReadFloat(); + } + + void BinaryUtils::WriteFloat(InteropOutputStream* stream, float val) + { + stream->WriteFloat(val); + } + + void BinaryUtils::ReadFloatArray(InteropInputStream* stream, float* res, const int32_t len) + { + stream->ReadFloatArray(res, len); + } + + void BinaryUtils::WriteFloatArray(InteropOutputStream* stream, const float* val, const int32_t len) + { + stream->WriteFloatArray(val, len); + } + + double BinaryUtils::ReadDouble(InteropInputStream* stream) + { + return stream->ReadDouble(); + } + + void BinaryUtils::WriteDouble(InteropOutputStream* stream, double val) + { + stream->WriteDouble(val); + } + + void BinaryUtils::ReadDoubleArray(InteropInputStream* stream, double* res, const int32_t len) + { + stream->ReadDoubleArray(res, len); + } + + void BinaryUtils::WriteDoubleArray(InteropOutputStream* stream, const double* val, const int32_t len) + { + stream->WriteDoubleArray(val, len); + } + + Guid BinaryUtils::ReadGuid(interop::InteropInputStream* stream) + { + int64_t most = stream->ReadInt64(); + int64_t least = stream->ReadInt64(); + + return Guid(most, least); + } + + void BinaryUtils::WriteGuid(interop::InteropOutputStream* stream, const Guid val) + { + stream->WriteInt64(val.GetMostSignificantBits()); + stream->WriteInt64(val.GetLeastSignificantBits()); + } + + void BinaryUtils::WriteString(interop::InteropOutputStream* stream, const char* val, const int32_t len) + { + stream->WriteInt32(len); + stream->WriteInt8Array(reinterpret_cast<const int8_t*>(val), len); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/binary/binary_writer_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/binary/binary_writer_impl.cpp b/modules/platforms/cpp/core/src/impl/binary/binary_writer_impl.cpp new file mode 100644 index 0000000..47df19d --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/binary/binary_writer_impl.cpp @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_writer_impl.h" +#include "ignite/impl/interop/interop_stream_position_guard.h" +#include "ignite/ignite_error.h" + +using namespace ignite::impl::interop; +using namespace ignite::impl::binary; +using namespace ignite::binary; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryWriterImpl::BinaryWriterImpl(InteropOutputStream* stream, BinaryIdResolver* idRslvr, + BinaryTypeManager* metaMgr, BinaryTypeHandler* metaHnd, int32_t start) : + stream(stream), idRslvr(idRslvr), metaMgr(metaMgr), metaHnd(metaHnd), typeId(idRslvr->GetTypeId()), + elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(-1), start(start) + { + // No-op. + } + + BinaryWriterImpl::BinaryWriterImpl(InteropOutputStream* stream, BinaryTypeManager* metaMgr) : + stream(stream), idRslvr(NULL), metaMgr(metaMgr), metaHnd(NULL), typeId(0), + elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(0), start(stream->Position()) + { + // No-op. + } + + void BinaryWriterImpl::WriteInt8(const int8_t val) + { + WritePrimitiveRaw<int8_t>(val, BinaryUtils::WriteInt8); + } + + void BinaryWriterImpl::WriteInt8Array(const int8_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int8_t>(val, len, BinaryUtils::WriteInt8Array, IGNITE_TYPE_ARRAY_BYTE); + } + + void BinaryWriterImpl::WriteInt8(const char* fieldName, const int8_t val) + { + WritePrimitive<int8_t>(fieldName, val, BinaryUtils::WriteInt8, IGNITE_TYPE_BYTE, 1); + } + + void BinaryWriterImpl::WriteInt8Array(const char* fieldName, const int8_t* val, const int32_t len) + { + WritePrimitiveArray<int8_t>(fieldName, val, len, BinaryUtils::WriteInt8Array, IGNITE_TYPE_ARRAY_BYTE, 0); + } + + void BinaryWriterImpl::WriteBool(const bool val) + { + WritePrimitiveRaw<bool>(val, BinaryUtils::WriteBool); + } + + void BinaryWriterImpl::WriteBoolArray(const bool* val, const int32_t len) + { + WritePrimitiveArrayRaw<bool>(val, len, BinaryUtils::WriteBoolArray, IGNITE_TYPE_ARRAY_BOOL); + } + + void BinaryWriterImpl::WriteBool(const char* fieldName, const bool val) + { + WritePrimitive<bool>(fieldName, val, BinaryUtils::WriteBool, IGNITE_TYPE_BOOL, 1); + } + + void BinaryWriterImpl::WriteBoolArray(const char* fieldName, const bool* val, const int32_t len) + { + WritePrimitiveArray<bool>(fieldName, val, len, BinaryUtils::WriteBoolArray, IGNITE_TYPE_ARRAY_BOOL, 0); + } + + void BinaryWriterImpl::WriteInt16(const int16_t val) + { + WritePrimitiveRaw<int16_t>(val, BinaryUtils::WriteInt16); + } + + void BinaryWriterImpl::WriteInt16Array(const int16_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int16_t>(val, len, BinaryUtils::WriteInt16Array, IGNITE_TYPE_ARRAY_SHORT); + } + + void BinaryWriterImpl::WriteInt16(const char* fieldName, const int16_t val) + { + WritePrimitive<int16_t>(fieldName, val, BinaryUtils::WriteInt16, IGNITE_TYPE_SHORT, 2); + } + + void BinaryWriterImpl::WriteInt16Array(const char* fieldName, const int16_t* val, const int32_t len) + { + WritePrimitiveArray<int16_t>(fieldName, val, len, BinaryUtils::WriteInt16Array, IGNITE_TYPE_ARRAY_SHORT, 1); + } + + void BinaryWriterImpl::WriteUInt16(const uint16_t val) + { + WritePrimitiveRaw<uint16_t>(val, BinaryUtils::WriteUInt16); + } + + void BinaryWriterImpl::WriteUInt16Array(const uint16_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<uint16_t>(val, len, BinaryUtils::WriteUInt16Array, IGNITE_TYPE_ARRAY_CHAR); + } + + void BinaryWriterImpl::WriteUInt16(const char* fieldName, const uint16_t val) + { + WritePrimitive<uint16_t>(fieldName, val, BinaryUtils::WriteUInt16, IGNITE_TYPE_CHAR, 2); + } + + void BinaryWriterImpl::WriteUInt16Array(const char* fieldName, const uint16_t* val, const int32_t len) + { + WritePrimitiveArray<uint16_t>(fieldName, val, len, BinaryUtils::WriteUInt16Array, IGNITE_TYPE_ARRAY_CHAR, 1); + } + + void BinaryWriterImpl::WriteInt32(const int32_t val) + { + WritePrimitiveRaw<int32_t>(val, BinaryUtils::WriteInt32); + } + + void BinaryWriterImpl::WriteInt32Array(const int32_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int32_t>(val, len, BinaryUtils::WriteInt32Array, IGNITE_TYPE_ARRAY_INT); + } + + void BinaryWriterImpl::WriteInt32(const char* fieldName, const int32_t val) + { + WritePrimitive<int32_t>(fieldName, val, BinaryUtils::WriteInt32, IGNITE_TYPE_INT, 4); + } + + void BinaryWriterImpl::WriteInt32Array(const char* fieldName, const int32_t* val, const int32_t len) + { + WritePrimitiveArray<int32_t>(fieldName, val, len, BinaryUtils::WriteInt32Array, IGNITE_TYPE_ARRAY_INT, 2); + } + + void BinaryWriterImpl::WriteInt64(const int64_t val) + { + WritePrimitiveRaw<int64_t>(val, BinaryUtils::WriteInt64); + } + + void BinaryWriterImpl::WriteInt64Array(const int64_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int64_t>(val, len, BinaryUtils::WriteInt64Array, IGNITE_TYPE_ARRAY_LONG); + } + + void BinaryWriterImpl::WriteInt64(const char* fieldName, const int64_t val) + { + WritePrimitive<int64_t>(fieldName, val, BinaryUtils::WriteInt64, IGNITE_TYPE_LONG, 8); + } + + void BinaryWriterImpl::WriteInt64Array(const char* fieldName, const int64_t* val, const int32_t len) + { + WritePrimitiveArray<int64_t>(fieldName, val, len, BinaryUtils::WriteInt64Array, IGNITE_TYPE_ARRAY_LONG, 3); + } + + void BinaryWriterImpl::WriteFloat(const float val) + { + WritePrimitiveRaw<float>(val, BinaryUtils::WriteFloat); + } + + void BinaryWriterImpl::WriteFloatArray(const float* val, const int32_t len) + { + WritePrimitiveArrayRaw<float>(val, len, BinaryUtils::WriteFloatArray, IGNITE_TYPE_ARRAY_FLOAT); + } + + void BinaryWriterImpl::WriteFloat(const char* fieldName, const float val) + { + WritePrimitive<float>(fieldName, val, BinaryUtils::WriteFloat, IGNITE_TYPE_FLOAT, 4); + } + + void BinaryWriterImpl::WriteFloatArray(const char* fieldName, const float* val, const int32_t len) + { + WritePrimitiveArray<float>(fieldName, val, len, BinaryUtils::WriteFloatArray, IGNITE_TYPE_ARRAY_FLOAT, 2); + } + + void BinaryWriterImpl::WriteDouble(const double val) + { + WritePrimitiveRaw<double>(val, BinaryUtils::WriteDouble); + } + + void BinaryWriterImpl::WriteDoubleArray(const double* val, const int32_t len) + { + WritePrimitiveArrayRaw<double>(val, len, BinaryUtils::WriteDoubleArray, IGNITE_TYPE_ARRAY_DOUBLE); + } + + void BinaryWriterImpl::WriteDouble(const char* fieldName, const double val) + { + WritePrimitive<double>(fieldName, val, BinaryUtils::WriteDouble, IGNITE_TYPE_DOUBLE, 8); + } + + void BinaryWriterImpl::WriteDoubleArray(const char* fieldName, const double* val, const int32_t len) + { + WritePrimitiveArray<double>(fieldName, val, len, BinaryUtils::WriteDoubleArray, IGNITE_TYPE_ARRAY_DOUBLE, 3); + } + + void BinaryWriterImpl::WriteGuid(const Guid val) + { + CheckRawMode(true); + CheckSingleMode(true); + + stream->WriteInt8(IGNITE_TYPE_UUID); + + BinaryUtils::WriteGuid(stream, val); + } + + void BinaryWriterImpl::WriteGuidArray(const Guid* val, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_UUID); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + Guid elem = *(val + i); + + stream->WriteInt8(IGNITE_TYPE_UUID); + BinaryUtils::WriteGuid(stream, elem); + } + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteGuid(const char* fieldName, const Guid val) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_UUID); + + stream->WriteInt8(IGNITE_TYPE_UUID); + + BinaryUtils::WriteGuid(stream, val); + } + + void BinaryWriterImpl::WriteGuidArray(const char* fieldName, const Guid* val, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_UUID); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_UUID); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + Guid elem = *(val + i); + + WriteTopObject(elem); + } + } + else + { + stream->WriteInt8(IGNITE_HDR_NULL); + } + } + + void BinaryWriterImpl::WriteString(const char* val, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_STRING); + + BinaryUtils::WriteString(stream, val, len); + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteString(const char* fieldName, const char* val, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_STRING); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_STRING); + + BinaryUtils::WriteString(stream, val, len); + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + int32_t BinaryWriterImpl::WriteStringArray() + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_ARRAY_STRING); + stream->Position(stream->Position() + 4); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteStringArray(const char* fieldName) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_STRING); + + stream->WriteInt8(IGNITE_TYPE_ARRAY_STRING); + stream->Position(stream->Position() + 4); + + return elemId; + } + + void BinaryWriterImpl::WriteStringElement(int32_t id, const char* val, int32_t len) + { + CheckSession(id); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_STRING); + + BinaryUtils::WriteString(stream, val, len); + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + + elemCnt++; + } + + void BinaryWriterImpl::WriteNull() + { + CheckRawMode(true); + CheckSingleMode(true); + + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteNull(const char* fieldName) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_OBJECT); + stream->WriteInt8(IGNITE_HDR_NULL); + } + + int32_t BinaryWriterImpl::WriteArray() + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_ARRAY); + stream->Position(stream->Position() + 4); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteArray(const char* fieldName) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY); + + stream->WriteInt8(IGNITE_TYPE_ARRAY); + stream->Position(stream->Position() + 4); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteCollection(CollectionType typ) + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_COLLECTION); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteCollection(const char* fieldName, CollectionType typ) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_COLLECTION); + + stream->WriteInt8(IGNITE_TYPE_COLLECTION); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteMap(ignite::binary::MapType typ) + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_MAP); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteMap(const char* fieldName, ignite::binary::MapType typ) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_MAP); + + stream->WriteInt8(IGNITE_TYPE_MAP); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + void BinaryWriterImpl::CommitContainer(int32_t id) + { + CheckSession(id); + + stream->WriteInt32(elemPos + 1, elemCnt); + + elemId = 0; + elemCnt = 0; + elemPos = -1; + } + + void BinaryWriterImpl::SetRawMode() + { + CheckRawMode(false); + CheckSingleMode(true); + + rawPos = stream->Position(); + } + + int32_t BinaryWriterImpl::GetRawPosition() const + { + return rawPos == -1 ? stream->Position() : rawPos; + } + + void BinaryWriterImpl::CheckRawMode(bool expected) const + { + bool rawMode = rawPos != -1; + + if (expected && !rawMode) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation can be performed only in raw mode."); + } + else if (!expected && rawMode) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation cannot be performed in raw mode."); + } + } + + void BinaryWriterImpl::CheckSingleMode(bool expected) const + { + if (expected && elemId != 0) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation cannot be performed when container is being written."); + } + else if (!expected && elemId == 0) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation can be performed only when container is being written."); + } + } + + void BinaryWriterImpl::StartContainerSession(bool expRawMode) + { + CheckRawMode(expRawMode); + CheckSingleMode(true); + + elemId = ++elemIdGen; + elemPos = stream->Position(); + } + + void BinaryWriterImpl::CheckSession(int32_t expSes) const + { + if (elemId != expSes) + { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Containter write session has been finished or is not started yet."); + } + } + + void BinaryWriterImpl::WriteFieldId(const char* fieldName, int32_t fieldTypeId) + { + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldOff = stream->Position() - start; + + schema.AddField(fieldId, fieldOff); + + if (metaHnd) + metaHnd->OnFieldWritten(fieldId, fieldName, fieldTypeId); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int8_t>(const int8_t& obj) + { + WriteTopObject0<int8_t>(obj, BinaryUtils::WriteInt8, IGNITE_TYPE_BYTE); + } + + template <> + void BinaryWriterImpl::WriteTopObject<bool>(const bool& obj) + { + WriteTopObject0<bool>(obj, BinaryUtils::WriteBool, IGNITE_TYPE_BOOL); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int16_t>(const int16_t& obj) + { + WriteTopObject0<int16_t>(obj, BinaryUtils::WriteInt16, IGNITE_TYPE_SHORT); + } + + template <> + void BinaryWriterImpl::WriteTopObject<uint16_t>(const uint16_t& obj) + { + WriteTopObject0<uint16_t>(obj, BinaryUtils::WriteUInt16, IGNITE_TYPE_CHAR); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int32_t>(const int32_t& obj) + { + WriteTopObject0<int32_t>(obj, BinaryUtils::WriteInt32, IGNITE_TYPE_INT); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int64_t>(const int64_t& obj) + { + WriteTopObject0<int64_t>(obj, BinaryUtils::WriteInt64, IGNITE_TYPE_LONG); + } + + template <> + void BinaryWriterImpl::WriteTopObject<float>(const float& obj) + { + WriteTopObject0<float>(obj, BinaryUtils::WriteFloat, IGNITE_TYPE_FLOAT); + } + + template <> + void BinaryWriterImpl::WriteTopObject<double>(const double& obj) + { + WriteTopObject0<double>(obj, BinaryUtils::WriteDouble, IGNITE_TYPE_DOUBLE); + } + + template <> + void BinaryWriterImpl::WriteTopObject<Guid>(const Guid& obj) + { + WriteTopObject0<Guid>(obj, BinaryUtils::WriteGuid, IGNITE_TYPE_UUID); + } + + void BinaryWriterImpl::PostWrite() + { + int32_t lenWithoutSchema = stream->Position() - start; + + int32_t nonRawLen = rawPos == -1 ? lenWithoutSchema : rawPos - start; + + if (schema.Empty()) + { + stream->WriteInt16(start + IGNITE_OFFSET_FLAGS, IGNITE_BINARY_FLAG_USER_OBJECT | + IGNITE_BINARY_FLAG_RAW_ONLY); + stream->WriteInt32(start + IGNITE_OFFSET_LEN, lenWithoutSchema); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, 0); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, GetRawPosition() - start); + } + else + { + int32_t schemaId = schema.GetId(); + BinaryOffsetType schemaType = schema.GetType(); + + WriteAndClearSchema(); + + if (rawPos > 0) + stream->WriteInt32(rawPos - start); + + int32_t length = stream->Position() - start; + + if (schemaType == OFFSET_TYPE_1_BYTE) + { + stream->WriteInt16(start + IGNITE_OFFSET_FLAGS, + IGNITE_BINARY_FLAG_USER_OBJECT | IGNITE_BINARY_FLAG_OFFSET_1_BYTE); + } + else if (schemaType == OFFSET_TYPE_2_BYTE) + { + stream->WriteInt16(start + IGNITE_OFFSET_FLAGS, + IGNITE_BINARY_FLAG_USER_OBJECT | IGNITE_BINARY_FLAG_OFFSET_2_BYTE); + } + + stream->WriteInt32(start + IGNITE_OFFSET_LEN, length); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, schemaId); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, lenWithoutSchema); + } + } + + bool BinaryWriterImpl::HasSchema() const + { + return !schema.Empty(); + } + + void BinaryWriterImpl::WriteAndClearSchema() + { + schema.Write(*stream); + + schema.Clear(); + } + + InteropOutputStream* BinaryWriterImpl::GetStream() + { + return stream; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/303d79eb/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp index 05c3e38..08526b5 100644 --- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp @@ -18,10 +18,10 @@ #include "ignite/cache/cache_peek_mode.h" #include "ignite/impl/cache/cache_impl.h" #include "ignite/impl/interop/interop.h" -#include "ignite/impl/portable/portable_reader_impl.h" +#include "ignite/impl/binary/binary_reader_impl.h" #include "ignite/impl/utils.h" -#include "ignite/impl/portable/portable_metadata_updater_impl.h" -#include "ignite/portable/portable.h" +#include "ignite/impl/binary/binary_type_updater_impl.h" +#include "ignite/binary/binary.h" using namespace ignite::common::concurrent; using namespace ignite::common::java; @@ -30,9 +30,9 @@ using namespace ignite::cache::query; using namespace ignite::impl; using namespace ignite::impl::cache::query; using namespace ignite::impl::interop; -using namespace ignite::impl::portable; +using namespace ignite::impl::binary; using namespace ignite::impl::utils; -using namespace ignite::portable; +using namespace ignite::binary; namespace ignite { @@ -301,12 +301,12 @@ namespace ignite int64_t CacheImpl::WriteTo(InteropMemory* mem, InputOperation& inOp, IgniteError* err) { - PortableMetadataManager* metaMgr = env.Get()->GetMetadataManager(); + BinaryTypeManager* metaMgr = env.Get()->GetTypeManager(); int32_t metaVer = metaMgr->GetVersion(); InteropOutputStream out(mem); - PortableWriterImpl writer(&out, metaMgr); + BinaryWriterImpl writer(&out, metaMgr); inOp.ProcessInput(writer); @@ -314,7 +314,7 @@ namespace ignite if (metaMgr->IsUpdatedSince(metaVer)) { - PortableMetadataUpdaterImpl metaUpdater(env, javaRef); + BinaryTypeUpdaterImpl metaUpdater(env, javaRef); if (!metaMgr->ProcessPendingUpdates(&metaUpdater, err)) return 0; @@ -327,7 +327,7 @@ namespace ignite { InteropInputStream in(mem); - PortableReaderImpl reader(&in); + BinaryReaderImpl reader(&in); outOp.ProcessOutput(reader); }