http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqQuery.hpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqQuery.hpp b/clicache/src/CqQuery.hpp new file mode 100644 index 0000000..8cbf9f8 --- /dev/null +++ b/clicache/src/CqQuery.hpp @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "geode_defs.hpp" +#include "CqState.hpp" +#include "begin_native.hpp" +#include <geode/CqQuery.hpp> +#include "end_native.hpp" +#include "native_shared_ptr.hpp" + + +using namespace System; + +namespace Apache +{ + namespace Geode + { + namespace Client + { + namespace native = apache::geode::client; + + generic<class TResult> + interface class ICqResults; + + generic<class TKey, class TResult> + ref class CqAttributes; + + ref class CqStatistics; + + generic<class TKey, class TResult> + ref class CqAttributesMutator; + + generic<class TResult> + ref class Query; + + /// <summary> + /// Class to encapsulate a continuous query (CQ). + /// </summary> + /// <remarks> + /// A CqQuery is obtained from a QueryService which in turn is obtained + /// from the Cache. + /// This can be executed to return SelectResults which can be either + /// a ResultSet or a StructSet, or it can be just registered on the + /// java server without returning results immediately rather only + /// the incremental results. + /// + /// This class is intentionally not thread-safe. So multiple threads + /// should not operate on the same <c>CqQuery</c> object concurrently + /// rather should have their own <c>CqQuery</c> objects. + /// </remarks> + generic<class TKey, class TResult> + public ref class CqQuery sealed + { + public: + + /// <summary> + /// Executes the Cq Query on the cache server + /// </summary> + void Execute( ); + + /// <summary> + /// Executes the Cq Query on the cache server + /// and returns the Cqresults. + /// </summary> + ICqResults<TResult>^ ExecuteWithInitialResults(); + + /// <summary> + /// Executes the Cq Query on the cache server + /// with the specified timeout and returns the results. + /// </summary> + /// <param name="timeout">The time (in seconds) to wait for query response. + /// This should be less than or equal to 2^31/1000 i.e. 2147483. + /// </param> + /// <exception cref="IllegalArgumentException"> + /// if timeout parameter is greater than 2^31/1000. + /// </exception> + ICqResults<TResult>^ ExecuteWithInitialResults(System::UInt32 timeout); + + /// <summary> + /// Get the string for this cq query. + /// </summary> + property String^ QueryString + { + String^ get( ); + } + + /// <summary> + /// Get the name for this cq query. + /// </summary> + property String^ Name + { + String^ get( ); + } + + /// <summary> + /// Get the Attributes for this cq query. + /// </summary> + CqAttributes<TKey, TResult>^ GetCqAttributes(); + + /// <summary> + /// Get the Attributes Mutator for this cq query. + /// </summary> + CqAttributesMutator<TKey, TResult>^ GetCqAttributesMutator(); + + /// <summary> + /// Get the stats for this cq query. + /// </summary> + CqStatistics^ GetStatistics(); + + /// <summary> + /// Get the Query for this cq query. + /// </summary> + Query<TResult>^ GetQuery(); + + /// <summary> + /// stop the cq query + /// </summary> + void Stop( ); + + /// <summary> + /// stop the cq query + /// </summary> + void Close( ); + + /// <summary> + /// get the state of this cq query + /// </summary> + CqStateType GetState(); + + /// <summary> + /// Is this Cq in running state? + /// </summary> + bool IsRunning(); + + /// <summary> + /// Is this Cq in stopped state? + /// </summary> + bool IsStopped(); + + /// <summary> + /// Is this Cq in closed state? + /// </summary> + bool IsClosed(); + + internal: + + /// <summary> + /// Internal factory function to wrap a native object pointer inside + /// this managed class with null pointer check. + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + /// <returns> + /// The managed wrapper object; null if the native pointer is null. + /// </returns> + inline static CqQuery<TKey, TResult>^ Create( native::CqQueryPtr nativeptr ) + { + return __nullptr == nativeptr ? nullptr : + gcnew CqQuery<TKey, TResult>( nativeptr ); + } + + + private: + + /// <summary> + /// Private constructor to wrap a native object pointer + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + inline CqQuery( native::CqQueryPtr nativeptr ) + { + m_nativeptr = gcnew native_shared_ptr<native::CqQuery>(nativeptr); + } + + + native_shared_ptr<native::CqQuery>^ m_nativeptr; + }; + } // namespace Client + } // namespace Geode +} // namespace Apache +
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqServiceStatistics.cpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqServiceStatistics.cpp b/clicache/src/CqServiceStatistics.cpp new file mode 100644 index 0000000..6a9da4c --- /dev/null +++ b/clicache/src/CqServiceStatistics.cpp @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CqServiceStatistics.hpp" + + +namespace Apache +{ + namespace Geode + { + namespace Client + { + using namespace System; + + System::UInt32 CqServiceStatistics::numCqsActive() + { + try + { + return m_nativeptr->get()->numCqsActive(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqServiceStatistics::numCqsCreated() + { + try + { + return m_nativeptr->get()->numCqsCreated(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqServiceStatistics::numCqsClosed() + { + try + { + return m_nativeptr->get()->numCqsClosed(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqServiceStatistics::numCqsStopped() + { + try + { + return m_nativeptr->get()->numCqsStopped(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqServiceStatistics::numCqsOnClient() + { + try + { + return m_nativeptr->get()->numCqsOnClient(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + } // namespace Client + } // namespace Geode +} // namespace Apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqServiceStatistics.hpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqServiceStatistics.hpp b/clicache/src/CqServiceStatistics.hpp new file mode 100644 index 0000000..d711d8a --- /dev/null +++ b/clicache/src/CqServiceStatistics.hpp @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "geode_defs.hpp" +#include "begin_native.hpp" +#include <geode/CqServiceStatistics.hpp> +#include "end_native.hpp" +#include "native_shared_ptr.hpp" + +namespace Apache +{ + namespace Geode + { + namespace Client + { + namespace native = apache::geode::client; + + /// <summary> + /// Defines common statistical information for cqservice + /// </summary> + public ref class CqServiceStatistics sealed + { + public: + + /// <summary> + ///Get the number of CQs currently active. + ///Active CQs are those which are executing (in running state). + /// </summary> + System::UInt32 numCqsActive( ); + + /// <summary> + ///Get the total number of CQs created. This is a cumulative number. + /// </summary> + System::UInt32 numCqsCreated( ); + + /// <summary> + ///Get the total number of closed CQs. This is a cumulative number. + /// </summary> + System::UInt32 numCqsClosed( ); + + /// <summary> + ///Get the number of stopped CQs currently. + /// </summary> + System::UInt32 numCqsStopped( ); + + /// <summary> + ///Get number of CQs that are currently active or stopped. + ///The CQs included in this number are either running or stopped (suspended). + ///Closed CQs are not included. + /// </summary> + System::UInt32 numCqsOnClient( ); + + internal: + + /// <summary> + /// Internal factory function to wrap a native object pointer inside + /// this managed class with null pointer check. + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + /// <returns> + /// The managed wrapper object; null if the native pointer is null. + /// </returns> + inline static CqServiceStatistics^ Create( apache::geode::client::CqServiceStatisticsPtr nativeptr ) + { + return __nullptr == nativeptr ? nullptr : + gcnew CqServiceStatistics( nativeptr ); + } + + + private: + + /// <summary> + /// Private constructor to wrap a native object pointer + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + inline CqServiceStatistics( apache::geode::client::CqServiceStatisticsPtr nativeptr ) + { + m_nativeptr = gcnew native_shared_ptr<native::CqServiceStatistics>(nativeptr); + } + + native_shared_ptr<native::CqServiceStatistics>^ m_nativeptr; + }; + } // namespace Client + } // namespace Geode +} // namespace Apache + http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqState.cpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqState.cpp b/clicache/src/CqState.cpp new file mode 100644 index 0000000..0a83a1d --- /dev/null +++ b/clicache/src/CqState.cpp @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#include "geode_includes.hpp" +#include "CqState.hpp" +#include <vcclr.h> + +#include "impl/ManagedString.hpp" +using namespace System; +using namespace System::Runtime::InteropServices; + +namespace Apache +{ + namespace Geode + { + namespace Client + { + + String^ CqState::ToString() + { + return ManagedString::Get(m_nativeptr->toString()); + } + + bool CqState::IsRunning() + { + return m_nativeptr->isRunning(); + } + + bool CqState::IsStopped() + { + return m_nativeptr->isStopped(); + } + + bool CqState::IsClosed() + { + return m_nativeptr->isClosed(); + } + + bool CqState::IsClosing() + { + return m_nativeptr->isClosing(); + } + + void CqState::SetState( CqStateType state ) + { + apache::geode::client::CqState::StateType st =apache::geode::client::CqState::INVALID; + if(state == CqStateType::STOPPED) + st = apache::geode::client::CqState::STOPPED; + else if(state == CqStateType::RUNNING) + st = apache::geode::client::CqState::RUNNING; + else if(state == CqStateType::CLOSED) + st = apache::geode::client::CqState::CLOSED; + else if(state == CqStateType::CLOSING) + st = apache::geode::client::CqState::CLOSING; + + m_nativeptr->setState( st ); + } + + CqStateType CqState::GetState( ) + { + apache::geode::client::CqState::StateType st = m_nativeptr->getState( ); + CqStateType state; + if(st==apache::geode::client::CqState::STOPPED) + state = CqStateType::STOPPED; + else if(st==apache::geode::client::CqState::RUNNING) + state = CqStateType::RUNNING; + else if(st==apache::geode::client::CqState::CLOSED) + state = CqStateType::CLOSED; + else if(st==apache::geode::client::CqState::CLOSING) + state = CqStateType::CLOSING; + else + state = CqStateType::INVALID; + return state; + } // namespace Client + } // namespace Geode +} // namespace Apache + + } //namespace http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqState.hpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqState.hpp b/clicache/src/CqState.hpp new file mode 100644 index 0000000..21ad5e6 --- /dev/null +++ b/clicache/src/CqState.hpp @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "geode_defs.hpp" +#include "begin_native.hpp" +#include <geode/CqState.hpp> +#include "end_native.hpp" + + + +using namespace System; + +namespace Apache +{ + namespace Geode + { + namespace Client + { + namespace native = apache::geode::client; + + /// <summary> + /// Enumerated type for cq state + /// @nativeclient + /// For Native Clients: + /// @endnativeclient + /// </summary> + public enum class CqStateType + { + STOPPED = 0, + RUNNING, + CLOSED, + CLOSING, + INVALID + }; + + + /// <summary> + /// Static class containing convenience methods for <c>CqState</c>. + /// </summary> + public ref class CqState sealed + { + public: + + /// <summary> + /// Returns the state in string form. + /// </summary> + virtual String^ ToString( ) override; + + /// <summary> + /// Returns true if the CQ is in Running state. + /// </summary> + bool IsRunning(); + + /// <summary> + /// Returns true if the CQ is in Stopped state. + /// </summary> + bool IsStopped(); + + /// <summary> + /// Returns true if the CQ is in Closed state. + /// </summary> + bool IsClosed(); + + /// <summary> + /// Returns true if the CQ is in Closing state. + /// </summary> + bool IsClosing(); + void SetState(CqStateType state); + CqStateType GetState(); + + internal: + + /// <summary> + /// Internal constructor to wrap a native object pointer + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + inline CqState( native::CqState* nativeptr ) + : m_nativeptr(nativeptr) + { + } + + private: + native::CqState* m_nativeptr; + }; + } // namespace Client + } // namespace Geode +} // namespace Apache + http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqStatistics.cpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqStatistics.cpp b/clicache/src/CqStatistics.cpp new file mode 100644 index 0000000..ad19481 --- /dev/null +++ b/clicache/src/CqStatistics.cpp @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CqStatistics.hpp" + +namespace Apache +{ + namespace Geode + { + namespace Client + { + using namespace System; + + System::UInt32 CqStatistics::numInserts() + { + try + { + return m_nativeptr->get()->numInserts(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqStatistics::numDeletes() + { + try + { + return m_nativeptr->get()->numDeletes(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqStatistics::numUpdates() + { + try + { + return m_nativeptr->get()->numUpdates(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + System::UInt32 CqStatistics::numEvents() + { + try + { + return m_nativeptr->get()->numEvents(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + + } // namespace Client + } // namespace Geode +} // namespace Apache + http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/CqStatistics.hpp ---------------------------------------------------------------------- diff --git a/clicache/src/CqStatistics.hpp b/clicache/src/CqStatistics.hpp new file mode 100644 index 0000000..05aa23c --- /dev/null +++ b/clicache/src/CqStatistics.hpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "geode_defs.hpp" +#include "begin_native.hpp" +#include <geode/CqStatistics.hpp> +#include "end_native.hpp" +#include "native_shared_ptr.hpp" + + +namespace Apache +{ + namespace Geode + { + namespace Client + { + namespace native = apache::geode::client; + + /// <summary> + /// Defines common statistical information for a cq. + /// </summary> + public ref class CqStatistics sealed + { + public: + + /// <summary> + /// get number of inserts qualified by this Cq + /// </summary> + System::UInt32 numInserts( ); + + /// <summary> + /// get number of deletes qualified by this Cq + /// </summary> + System::UInt32 numDeletes( ); + + /// <summary> + /// get number of updates qualified by this Cq + /// </summary> + System::UInt32 numUpdates( ); + + /// <summary> + /// get number of events qualified by this Cq + /// </summary> + System::UInt32 numEvents( ); + + internal: + + /// <summary> + /// Internal factory function to wrap a native object pointer inside + /// this managed class with null pointer check. + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + /// <returns> + /// The managed wrapper object; null if the native pointer is null. + /// </returns> + inline static CqStatistics^ Create( apache::geode::client::CqStatisticsPtr nativeptr ) + { + return __nullptr == nativeptr ? nullptr : + gcnew CqStatistics( nativeptr ); + } + + + private: + + /// <summary> + /// Private constructor to wrap a native object pointer + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + inline CqStatistics( apache::geode::client::CqStatisticsPtr nativeptr ) + { + m_nativeptr = gcnew native_shared_ptr<native::CqStatistics>(nativeptr); + } + + native_shared_ptr<native::CqStatistics>^ m_nativeptr; + + }; + } // namespace Client + } // namespace Geode +} // namespace Apache + http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/DataInput.cpp ---------------------------------------------------------------------- diff --git a/clicache/src/DataInput.cpp b/clicache/src/DataInput.cpp new file mode 100644 index 0000000..a3ca689 --- /dev/null +++ b/clicache/src/DataInput.cpp @@ -0,0 +1,1165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "begin_native.hpp" +#include <geode/Cache.hpp> +#include <GeodeTypeIdsImpl.hpp> +#include "SerializationRegistry.hpp" +#include "CacheRegionHelper.hpp" +#include "end_native.hpp" + +#include <vcclr.h> + +#include "DataInput.hpp" +#include "Cache.hpp" +#include "CacheableString.hpp" +#include "CacheableHashMap.hpp" +#include "CacheableStack.hpp" +#include "CacheableVector.hpp" +#include "CacheableArrayList.hpp" +#include "CacheableIDentityHashMap.hpp" +#include "CacheableDate.hpp" +#include "CacheableObjectArray.hpp" +#include "Serializable.hpp" +#include "impl/PdxHelper.hpp" + +using namespace System; +using namespace System::IO; +using namespace apache::geode::client; + +namespace Apache +{ + namespace Geode + { + namespace Client + { + namespace native = apache::geode::client; + + DataInput::DataInput(System::Byte* buffer, int size, const native::Cache* cache) + { + m_ispdxDesrialization = false; + m_isRootObjectPdx = false; + m_cache = cache; + if (buffer != nullptr && size > 0) { + _GF_MG_EXCEPTION_TRY2 + + m_nativeptr = gcnew native_conditional_unique_ptr<native::DataInput>(cache->createDataInput(buffer, size)); + m_cursor = 0; + m_isManagedObject = false; + m_forStringDecode = gcnew array<Char>(100); + + try + { + m_buffer = const_cast<System::Byte*>(m_nativeptr->get()->currentBufferPosition()); + m_bufferLength = m_nativeptr->get()->getBytesRemaining(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + + _GF_MG_EXCEPTION_CATCH_ALL2 + } + else { + throw gcnew IllegalArgumentException("DataInput.ctor(): " + "provided buffer is null or empty"); + } + } + + DataInput::DataInput(array<Byte>^ buffer, const native::Cache * cache) + { + m_ispdxDesrialization = false; + m_isRootObjectPdx = false; + m_cache = cache; + if (buffer != nullptr && buffer->Length > 0) { + _GF_MG_EXCEPTION_TRY2 + + System::Int32 len = buffer->Length; + GF_NEW(m_buffer, System::Byte[len]); + pin_ptr<const Byte> pin_buffer = &buffer[0]; + memcpy(m_buffer, (void*)pin_buffer, len); + m_nativeptr = gcnew native_conditional_unique_ptr<native::DataInput>(m_cache->createDataInput(m_buffer, len)); + + m_cursor = 0; + m_isManagedObject = false; + m_forStringDecode = gcnew array<Char>(100); + + try + { + m_buffer = const_cast<System::Byte*>(m_nativeptr->get()->currentBufferPosition()); + m_bufferLength = m_nativeptr->get()->getBytesRemaining(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + + _GF_MG_EXCEPTION_CATCH_ALL2 + } + else { + throw gcnew IllegalArgumentException("DataInput.ctor(): " + "provided buffer is null or empty"); + } + } + + DataInput::DataInput(array<Byte>^ buffer, System::Int32 len, const native::Cache* cache) + { + m_ispdxDesrialization = false; + m_isRootObjectPdx = false; + m_cache = cache; + if (buffer != nullptr) { + if (len == 0 || (System::Int32)len > buffer->Length) { + throw gcnew IllegalArgumentException(String::Format( + "DataInput.ctor(): given length {0} is zero or greater than " + "size of buffer {1}", len, buffer->Length)); + } + //m_bytes = gcnew array<Byte>(len); + //System::Array::Copy(buffer, 0, m_bytes, 0, len); + _GF_MG_EXCEPTION_TRY2 + + GF_NEW(m_buffer, System::Byte[len]); + pin_ptr<const Byte> pin_buffer = &buffer[0]; + memcpy(m_buffer, (void*)pin_buffer, len); + m_nativeptr = gcnew native_conditional_unique_ptr<native::DataInput>(m_cache->createDataInput(m_buffer, len)); + + try + { + m_buffer = const_cast<System::Byte*>(m_nativeptr->get()->currentBufferPosition()); + m_bufferLength = m_nativeptr->get()->getBytesRemaining(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + + _GF_MG_EXCEPTION_CATCH_ALL2 + } + else { + throw gcnew IllegalArgumentException("DataInput.ctor(): " + "provided buffer is null"); + } + } + + void DataInput::CheckBufferSize(int size) + { + if ((unsigned int)(m_cursor + size) > m_bufferLength) + { + Log::Debug("DataInput::CheckBufferSize m_cursor:" + m_cursor + " size:" + size + " m_bufferLength:" + m_bufferLength); + throw gcnew OutOfRangeException("DataInput: attempt to read beyond buffer"); + } + } + + DataInput^ DataInput::GetClone() + { + return gcnew DataInput(m_buffer, m_bufferLength, m_cache); + } + + Byte DataInput::ReadByte() + { + CheckBufferSize(1); + return m_buffer[m_cursor++]; + } + + SByte DataInput::ReadSByte() + { + CheckBufferSize(1); + return m_buffer[m_cursor++]; + } + + bool DataInput::ReadBoolean() + { + CheckBufferSize(1); + Byte val = m_buffer[m_cursor++]; + if (val == 1) + return true; + else + return false; + } + + Char DataInput::ReadChar() + { + CheckBufferSize(2); + Char data = m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + return data; + } + + array<Byte>^ DataInput::ReadBytes() + { + System::Int32 length; + length = ReadArrayLen(); + + if (length >= 0) { + if (length == 0) + return gcnew array<Byte>(0); + else { + array<Byte>^ bytes = ReadBytesOnly(length); + return bytes; + } + } + return nullptr; + } + + int DataInput::ReadArrayLen() + { + int code; + int len; + + code = Convert::ToInt32(ReadByte()); + + if (code == 0xFF) { + len = -1; + } + else { + unsigned int result = code; + if (result > 252) { // 252 is java's ((byte)-4 && 0xFF) + if (code == 0xFE) { + result = ReadUInt16(); + } + else if (code == 0xFD) { + result = ReadUInt32(); + } + else { + throw gcnew IllegalStateException("unexpected array length code"); + } + //TODO:: illegal length + } + len = (int)result; + } + return len; + } + + array<SByte>^ DataInput::ReadSBytes() + { + System::Int32 length; + length = ReadArrayLen(); + + if (length > -1) { + if (length == 0) + return gcnew array<SByte>(0); + else { + array<SByte>^ bytes = ReadSBytesOnly(length); + return bytes; + } + } + return nullptr; + } + + array<Byte>^ DataInput::ReadBytesOnly(System::UInt32 len) + { + if (len > 0) { + CheckBufferSize(len); + array<Byte>^ bytes = gcnew array<Byte>(len); + + for (unsigned int i = 0; i < len; i++) + bytes[i] = m_buffer[m_cursor++]; + + return bytes; + } + return nullptr; + } + + void DataInput::ReadBytesOnly(array<Byte> ^ buffer, int offset, int count) + { + if (count > 0) { + CheckBufferSize((System::UInt32)count); + + for (int i = 0; i < count; i++) + buffer[offset + i] = m_buffer[m_cursor++]; + } + } + + array<SByte>^ DataInput::ReadSBytesOnly(System::UInt32 len) + { + if (len > 0) { + CheckBufferSize(len); + array<SByte>^ bytes = gcnew array<SByte>(len); + + for (unsigned int i = 0; i < len; i++) + bytes[i] = (SByte)m_buffer[m_cursor++]; + + return bytes; + } + return nullptr; + } + + System::UInt16 DataInput::ReadUInt16() + { + CheckBufferSize(2); + System::UInt16 data = m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + return data; + } + + System::UInt32 DataInput::ReadUInt32() + { + CheckBufferSize(4); + System::UInt32 data = m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + + return data; + } + + System::UInt64 DataInput::ReadUInt64() + { + System::UInt64 data; + + CheckBufferSize(8); + + data = m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + data = (data << 8) | m_buffer[m_cursor++]; + + return data; + } + + System::Int16 DataInput::ReadInt16() + { + return ReadUInt16(); + } + + System::Int32 DataInput::ReadInt32() + { + return ReadUInt32(); + } + + System::Int64 DataInput::ReadInt64() + { + return ReadUInt64(); + } + + array<Byte>^ DataInput::ReadReverseBytesOnly(int len) + { + CheckBufferSize(len); + + int i = 0; + int j = m_cursor + len - 1; + array<Byte>^ bytes = gcnew array<Byte>(len); + + while (i < len) + { + bytes[i++] = m_buffer[j--]; + } + m_cursor += len; + return bytes; + } + + float DataInput::ReadFloat() + { + float data; + + array<Byte>^ bytes = nullptr; + if (BitConverter::IsLittleEndian) + bytes = ReadReverseBytesOnly(4); + else + bytes = ReadBytesOnly(4); + + data = BitConverter::ToSingle(bytes, 0); + + return data; + } + + double DataInput::ReadDouble() + { + double data; + + array<Byte>^ bytes = nullptr; + if (BitConverter::IsLittleEndian) + bytes = ReadReverseBytesOnly(8); + else + bytes = ReadBytesOnly(8); + + data = BitConverter::ToDouble(bytes, 0); + + return data; + } + + String^ DataInput::ReadUTF() + { + int length = ReadUInt16(); + CheckBufferSize(length); + String^ str = DecodeBytes(length); + return str; + } + + String^ DataInput::ReadUTFHuge() + { + int length = ReadUInt32(); + CheckBufferSize(length); + + array<Char>^ chArray = gcnew array<Char>(length); + + for (int i = 0; i < length; i++) + { + Char ch = ReadByte(); + ch = ((ch << 8) | ReadByte()); + chArray[i] = ch; + } + + String^ str = gcnew String(chArray); + + return str; + } + + String^ DataInput::ReadASCIIHuge() + { + int length = ReadInt32(); + CheckBufferSize(length); + String^ str = DecodeBytes(length); + return str; + } + + Object^ DataInput::ReadObject() + { + return ReadInternalObject(); + } + + /* Object^ DataInput::ReadGenericObject( ) + { + return ReadInternalGenericObject(); + }*/ + + Object^ DataInput::ReadDotNetTypes(int8_t typeId) + { + switch (typeId) + { + case apache::geode::client::GeodeTypeIds::CacheableByte: + { + return ReadSByte(); + } + case apache::geode::client::GeodeTypeIds::CacheableBoolean: + { + bool obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableWideChar: + { + Char obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableDouble: + { + Double obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableASCIIString: + { + /* CacheableString^ cs = static_cast<CacheableString^>(CacheableString::CreateDeserializable()); + cs->FromData(this); + return cs->Value;*/ + return ReadUTF(); + } + case apache::geode::client::GeodeTypeIds::CacheableASCIIStringHuge: + { + /*CacheableString^ cs = static_cast<CacheableString^>(CacheableString::createDeserializableHuge()); + cs->FromData(this); + return cs->Value;*/ + return ReadASCIIHuge(); + } + case apache::geode::client::GeodeTypeIds::CacheableString: + { + /*CacheableString^ cs = static_cast<CacheableString^>(CacheableString::createUTFDeserializable()); + cs->FromData(this); + return cs->Value;*/ + return ReadUTF(); + } + case apache::geode::client::GeodeTypeIds::CacheableStringHuge: + { + //TODO: need to look all strings types + /*CacheableString^ cs = static_cast<CacheableString^>(CacheableString::createUTFDeserializableHuge()); + cs->FromData(this); + return cs->Value;*/ + return ReadUTFHuge(); + } + case apache::geode::client::GeodeTypeIds::CacheableFloat: + { + float obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableInt16: + { + Int16 obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableInt32: + { + Int32 obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableInt64: + { + Int64 obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableDate: + { + CacheableDate^ cd = CacheableDate::Create(); + cd->FromData(this); + return cd->Value; + } + case apache::geode::client::GeodeTypeIds::CacheableBytes: + { + return ReadBytes(); + } + case apache::geode::client::GeodeTypeIds::CacheableDoubleArray: + { + array<Double>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableFloatArray: + { + array<float>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableInt16Array: + { + array<Int16>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableInt32Array: + { + array<Int32>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::BooleanArray: + { + array<bool>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CharArray: + { + array<Char>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableInt64Array: + { + array<Int64>^ obj; + ReadObject(obj); + return obj; + } + case apache::geode::client::GeodeTypeIds::CacheableStringArray: + { + return ReadStringArray(); + } + case apache::geode::client::GeodeTypeIds::CacheableHashTable: + { + return ReadHashtable(); + } + case apache::geode::client::GeodeTypeIds::CacheableHashMap: + { + CacheableHashMap^ chm = static_cast<CacheableHashMap^>(CacheableHashMap::CreateDeserializable()); + chm->FromData(this); + return chm->Value; + } + case apache::geode::client::GeodeTypeIds::CacheableIdentityHashMap: + { + CacheableIdentityHashMap^ chm = static_cast<CacheableIdentityHashMap^>(CacheableIdentityHashMap::CreateDeserializable()); + chm->FromData(this); + return chm->Value; + } + case apache::geode::client::GeodeTypeIds::CacheableVector: + { + /*CacheableVector^ cv = static_cast<CacheableVector^>(CacheableVector::CreateDeserializable()); + cv->FromData(this); + return cv->Value;*/ + int len = ReadArrayLen(); + System::Collections::ArrayList^ retA = gcnew System::Collections::ArrayList(len); + + for (int i = 0; i < len; i++) + { + retA->Add(this->ReadObject()); + } + return retA; + } + case apache::geode::client::GeodeTypeIds::CacheableArrayList: + { + /*CacheableArrayList^ cv = static_cast<CacheableArrayList^>(CacheableArrayList::CreateDeserializable()); + cv->FromData(this); + return cv->Value;*/ + int len = ReadArrayLen(); + System::Collections::Generic::List<Object^>^ retA = gcnew System::Collections::Generic::List<Object^>(len); + for (int i = 0; i < len; i++) + { + retA->Add(this->ReadObject()); + } + return retA; + + } + case apache::geode::client::GeodeTypeIds::CacheableLinkedList: + { + /*CacheableArrayList^ cv = static_cast<CacheableArrayList^>(CacheableArrayList::CreateDeserializable()); + cv->FromData(this); + return cv->Value;*/ + int len = ReadArrayLen(); + System::Collections::Generic::LinkedList<Object^>^ retA = gcnew System::Collections::Generic::LinkedList<Object^>(); + for (int i = 0; i < len; i++) + { + retA->AddLast(this->ReadObject()); + } + return retA; + + } + case apache::geode::client::GeodeTypeIds::CacheableStack: + { + CacheableStack^ cv = static_cast<CacheableStack^>(CacheableStack::CreateDeserializable()); + cv->FromData(this); + return cv->Value; + } + default: + return nullptr; + } + } + + Object^ DataInput::ReadInternalObject() + { + try + { + //Log::Debug("DataInput::ReadInternalObject m_cursor " + m_cursor); + bool findinternal = false; + int8_t typeId = ReadByte(); + System::Int64 compId = typeId; + TypeFactoryMethodGeneric^ createType = nullptr; + + if (compId == GeodeTypeIds::NullObj) { + return nullptr; + } + else if (compId == GeodeClassIds::PDX) + { + //cache current state and reset after reading pdx object + int cacheCursor = m_cursor; + System::Byte* cacheBuffer = m_buffer; + unsigned int cacheBufferLength = m_bufferLength; + Object^ ret = Internal::PdxHelper::DeserializePdx(this, false, CacheRegionHelper::getCacheImpl(m_cache)->getSerializationRegistry().get()); + int tmp = m_nativeptr->get()->getBytesRemaining(); + m_cursor = cacheBufferLength - tmp; + m_buffer = cacheBuffer; + m_bufferLength = cacheBufferLength; + m_nativeptr->get()->rewindCursor(m_cursor); + + if (ret != nullptr) + { + PdxWrapper^ pdxWrapper = dynamic_cast<PdxWrapper^>(ret); + + if (pdxWrapper != nullptr) + { + return pdxWrapper->GetObject(); + } + } + return ret; + } + else if (compId == GeodeClassIds::PDX_ENUM) + { + int8_t dsId = ReadByte(); + int tmp = ReadArrayLen(); + int enumId = (dsId << 24) | (tmp & 0xFFFFFF); + + Object^ enumVal = Internal::PdxHelper::GetEnum(enumId, m_cache); + return enumVal; + } + else if (compId == GeodeTypeIds::CacheableNullString) { + //return SerializablePtr(CacheableString::createDeserializable()); + //TODO:: + return nullptr; + } + else if (compId == GeodeTypeIdsImpl::CacheableUserData) { + int8_t classId = ReadByte(); + //compId |= ( ( (System::Int64)classId ) << 32 ); + compId = (System::Int64)classId; + } + else if (compId == GeodeTypeIdsImpl::CacheableUserData2) { + System::Int16 classId = ReadInt16(); + //compId |= ( ( (System::Int64)classId ) << 32 ); + compId = (System::Int64)classId; + } + else if (compId == GeodeTypeIdsImpl::CacheableUserData4) { + System::Int32 classId = ReadInt32(); + //compId |= ( ( (System::Int64)classId ) << 32 ); + compId = (System::Int64)classId; + } + else if (compId == GeodeTypeIdsImpl::FixedIDByte) {//TODO: need to verify again + int8_t fixedId = ReadByte(); + compId = fixedId; + findinternal = true; + } + else if (compId == GeodeTypeIdsImpl::FixedIDShort) { + System::Int16 fixedId = ReadInt16(); + compId = fixedId; + findinternal = true; + } + else if (compId == GeodeTypeIdsImpl::FixedIDInt) { + System::Int32 fixedId = ReadInt32(); + compId = fixedId; + findinternal = true; + } + if (findinternal) { + compId += 0x80000000; + createType = Serializable::GetManagedDelegateGeneric((System::Int64)compId); + } + else { + createType = Serializable::GetManagedDelegateGeneric(compId); + if (createType == nullptr) + { + Object^ retVal = ReadDotNetTypes(typeId); + + if (retVal != nullptr) + return retVal; + + if (m_ispdxDesrialization && typeId == apache::geode::client::GeodeTypeIds::CacheableObjectArray) + {//object array and pdxSerialization + return readDotNetObjectArray(); + } + compId += 0x80000000; + createType = Serializable::GetManagedDelegateGeneric(compId); + + /*if (createType == nullptr) + { + //TODO:: final check for user type if its not in cache + compId -= 0x80000000; + createType = Serializable::GetManagedDelegate(compId); + }*/ + } + } + + if (createType == nullptr) { + throw gcnew IllegalStateException("Unregistered typeId " + typeId + " in deserialization, aborting."); + } + + bool isPdxDeserialization = m_ispdxDesrialization; + m_ispdxDesrialization = false;//for nested objects + IGeodeSerializable^ newObj = createType(); + newObj->FromData(this); + m_ispdxDesrialization = isPdxDeserialization; + return newObj; + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + + Object^ DataInput::readDotNetObjectArray() + { + int len = ReadArrayLen(); + String^ className = nullptr; + if (len >= 0) + { + ReadByte(); // ignore CLASS typeid + className = (String^)ReadObject(); + className = Serializable::GetLocalTypeName(className); + System::Collections::IList^ list = nullptr; + if (len == 0) + { + list = (System::Collections::IList^)Serializable::GetArrayObject(className, len); + return list; + } + //read first object + + Object^ ret = ReadObject();//in case it returns pdxinstance or java.lang.object + + list = (System::Collections::IList^)Serializable::GetArrayObject(ret->GetType()->FullName, len); + + list[0] = ret; + for (System::Int32 index = 1; index < list->Count; ++index) + { + list[index] = ReadObject(); + } + return list; + } + return nullptr; + } + + Object^ DataInput::ReadInternalGenericObject() + { + bool findinternal = false; + int8_t typeId = ReadByte(); + System::Int64 compId = typeId; + TypeFactoryMethodGeneric^ createType = nullptr; + + if (compId == GeodeTypeIds::NullObj) { + return nullptr; + } + else if (compId == GeodeClassIds::PDX) + { + return Internal::PdxHelper::DeserializePdx(this, false, CacheRegionHelper::getCacheImpl(m_cache)->getSerializationRegistry().get()); + } + else if (compId == GeodeTypeIds::CacheableNullString) { + //return SerializablePtr(CacheableString::createDeserializable()); + //TODO:: + return nullptr; + } + else if (compId == GeodeTypeIdsImpl::CacheableUserData) { + int8_t classId = ReadByte(); + //compId |= ( ( (System::Int64)classId ) << 32 ); + compId = (System::Int64)classId; + } + else if (compId == GeodeTypeIdsImpl::CacheableUserData2) { + System::Int16 classId = ReadInt16(); + //compId |= ( ( (System::Int64)classId ) << 32 ); + compId = (System::Int64)classId; + } + else if (compId == GeodeTypeIdsImpl::CacheableUserData4) { + System::Int32 classId = ReadInt32(); + //compId |= ( ( (System::Int64)classId ) << 32 ); + compId = (System::Int64)classId; + } + else if (compId == GeodeTypeIdsImpl::FixedIDByte) {//TODO: need to verify again + int8_t fixedId = ReadByte(); + compId = fixedId; + findinternal = true; + } + else if (compId == GeodeTypeIdsImpl::FixedIDShort) { + System::Int16 fixedId = ReadInt16(); + compId = fixedId; + findinternal = true; + } + else if (compId == GeodeTypeIdsImpl::FixedIDInt) { + System::Int32 fixedId = ReadInt32(); + compId = fixedId; + findinternal = true; + } + if (findinternal) { + compId += 0x80000000; + createType = Serializable::GetManagedDelegateGeneric((System::Int64)compId); + } + else { + createType = Serializable::GetManagedDelegateGeneric(compId); + if (createType == nullptr) + { + Object^ retVal = ReadDotNetTypes(typeId); + + if (retVal != nullptr) + return retVal; + + compId += 0x80000000; + createType = Serializable::GetManagedDelegateGeneric(compId); + } + } + + if (createType != nullptr) + { + IGeodeSerializable^ newObj = createType(); + newObj->FromData(this); + return newObj; + } + + throw gcnew IllegalStateException("Unregistered typeId in deserialization, aborting."); + } + + System::UInt32 DataInput::BytesRead::get() + { + AdvanceUMCursor(); + SetBuffer(); + + try + { + return m_nativeptr->get()->getBytesRead(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + + System::UInt32 DataInput::BytesReadInternally::get() + { + return m_cursor; + } + + System::UInt32 DataInput::BytesRemaining::get() + { + AdvanceUMCursor(); + SetBuffer(); + try + { + return m_nativeptr->get()->getBytesRemaining(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + + void DataInput::AdvanceCursor(System::Int32 offset) + { + m_cursor += offset; + } + + void DataInput::RewindCursor(System::Int32 offset) + { + AdvanceUMCursor(); + try + { + m_nativeptr->get()->rewindCursor(offset); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + SetBuffer(); + } + + void DataInput::Reset() + { + AdvanceUMCursor(); + try + { + m_nativeptr->get()->reset(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + SetBuffer(); + } + + void DataInput::Cleanup() + { + //TODO: + //GF_SAFE_DELETE_ARRAY(m_buffer); + } + + void DataInput::ReadDictionary(System::Collections::IDictionary^ dict) + { + int len = this->ReadArrayLen(); + + if (len > 0) + { + for (int i = 0; i < len; i++) + { + Object^ key = this->ReadObject(); + Object^ val = this->ReadObject(); + + dict->Add(key, val); + } + } + } + + IDictionary<Object^, Object^>^ DataInput::ReadDictionary() + { + int len = this->ReadArrayLen(); + + if (len == -1) + return nullptr; + else + { + IDictionary<Object^, Object^>^ dict = gcnew Dictionary<Object^, Object^>(); + for (int i = 0; i < len; i++) + { + Object^ key = this->ReadObject(); + Object^ val = this->ReadObject(); + + dict->Add(key, val); + } + return dict; + } + } + + System::DateTime DataInput::ReadDate() + { + long ticks = (long)ReadInt64(); + if (ticks != -1L) + { + m_cursor -= 8;//for above + CacheableDate^ cd = CacheableDate::Create(); + cd->FromData(this); + return cd->Value; + } + else + { + DateTime dt(0); + return dt; + } + } + + void DataInput::ReadCollection(System::Collections::IList^ coll) + { + int len = ReadArrayLen(); + for (int i = 0; i < len; i++) + { + coll->Add(ReadObject()); + } + } + + array<Char>^ DataInput::ReadCharArray() + { + array<Char>^ arr; + this->ReadObject(arr); + return arr; + } + + array<bool>^ DataInput::ReadBooleanArray() + { + array<bool>^ arr; + this->ReadObject(arr); + return arr; + } + + array<Int16>^ DataInput::ReadShortArray() + { + array<Int16>^ arr; + this->ReadObject(arr); + return arr; + } + + array<Int32>^ DataInput::ReadIntArray() + { + array<Int32>^ arr; + this->ReadObject(arr); + return arr; + } + + array<Int64>^ DataInput::ReadLongArray() + { + array<Int64>^ arr; + this->ReadObject(arr); + return arr; + } + + array<float>^ DataInput::ReadFloatArray() + { + array<float>^ arr; + this->ReadObject(arr); + return arr; + } + + array<double>^ DataInput::ReadDoubleArray() + { + array<double>^ arr; + this->ReadObject(arr); + return arr; + } + + List<Object^>^ DataInput::ReadObjectArray() + { + //this to know whether it is null or it is empty + int storeCursor = m_cursor; + int len = this->ReadArrayLen(); + if (len == -1) + return nullptr; + //this will be read further by fromdata + m_cursor = m_cursor - (m_cursor - storeCursor); + + + CacheableObjectArray^ coa = CacheableObjectArray::Create(); + coa->FromData(this); + List<Object^>^ retObj = (List<Object^>^)coa; + + if (retObj->Count >= 0) + return retObj; + return nullptr; + } + + array<array<Byte>^>^ DataInput::ReadArrayOfByteArrays() + { + int len = ReadArrayLen(); + if (len >= 0) + { + array<array<Byte>^>^ retVal = gcnew array<array<Byte>^>(len); + for (int i = 0; i < len; i++) + { + retVal[i] = this->ReadBytes(); + } + return retVal; + } + else + return nullptr; + } + + void DataInput::ReadObject(array<UInt16>^% obj) + { + int len = ReadArrayLen(); + if (len >= 0) + { + obj = gcnew array<UInt16>(len); + for (int i = 0; i < len; i++) + { + obj[i] = this->ReadUInt16(); + } + } + } + + void DataInput::ReadObject(array<UInt32>^% obj) + { + int len = ReadArrayLen(); + if (len >= 0) + { + obj = gcnew array<UInt32>(len); + for (int i = 0; i < len; i++) + { + obj[i] = this->ReadUInt32(); + } + } + } + + void DataInput::ReadObject(array<UInt64>^% obj) + { + int len = ReadArrayLen(); + if (len >= 0) + { + obj = gcnew array<UInt64>(len); + for (int i = 0; i < len; i++) + { + obj[i] = this->ReadUInt64(); + } + } + } + + String^ DataInput::ReadString() + { + UInt32 typeId = (Int32)ReadByte(); + + if (typeId == GeodeTypeIds::CacheableNullString) + return nullptr; + + if (typeId == GeodeTypeIds::CacheableASCIIString || + typeId == GeodeTypeIds::CacheableString) + { + return ReadUTF(); + } + else if (typeId == GeodeTypeIds::CacheableASCIIStringHuge) + { + return ReadASCIIHuge(); + } + else + { + return ReadUTFHuge(); + } // namespace Client + } // namespace Geode + } // namespace Apache + + } +} http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/src/DataInput.hpp ---------------------------------------------------------------------- diff --git a/clicache/src/DataInput.hpp b/clicache/src/DataInput.hpp new file mode 100644 index 0000000..80ed2b0 --- /dev/null +++ b/clicache/src/DataInput.hpp @@ -0,0 +1,711 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "geode_defs.hpp" +#include "begin_native.hpp" +#include <geode/DataInput.hpp> +#include "end_native.hpp" + +#include "native_conditional_unique_ptr.hpp" +#include "Log.hpp" +#include "ExceptionTypes.hpp" + +using namespace System; +using namespace System::Collections::Generic; + +namespace Apache +{ + namespace Geode + { + namespace Client + { + + namespace native = apache::geode::client; + + interface class IGeodeSerializable; + + /// <summary> + /// Provides operations for reading primitive data values, byte arrays, + /// strings, <c>IGeodeSerializable</c> objects from a byte stream. + /// </summary> + public ref class DataInput sealed + { + public: + + /// <summary> + /// Construct <c>DataInput</c> using an given array of bytes. + /// </summary> + /// <param name="buffer"> + /// The buffer to use for reading data values + /// </param> + /// <exception cref="IllegalArgumentException"> + /// if the buffer is null + /// </exception> + DataInput( array<Byte>^ buffer, const native::Cache* cache ); + + /// <summary> + /// Construct <c>DataInput</c> using a given length of an array of + /// bytes. + /// </summary> + /// <param name="buffer"> + /// The buffer to use for reading data values. + /// </param> + /// <param name="len"> + /// The number of bytes from the start of the buffer to use. + /// </param> + /// <exception cref="IllegalArgumentException"> + /// if the buffer is null + /// </exception> + DataInput( array<Byte>^ buffer, System::Int32 len, const native::Cache* cache ); + + /// <summary> + /// Dispose: frees the internal buffer. + /// </summary> + ~DataInput( ) { Cleanup( ); } + + /// <summary> + /// Finalizer: frees the internal buffer. + /// </summary> + !DataInput( ) { Cleanup( ); } + + /// <summary> + /// Read a signed byte from the stream. + /// </summary> + SByte ReadSByte( ); + + /// <summary> + /// Read a boolean value from the stream. + /// </summary> + bool ReadBoolean( ); + + /// <summary> + /// Read a char value from the stream. + /// </summary> + Char ReadChar( ); + + /// <summary> + /// Read an array of bytes from the stream reading the length + /// from the stream first. + /// </summary> + array<Byte>^ ReadBytes( ); + + /// <summary> + /// Read an array of signed bytes from the stream reading the length + /// from the stream first. + /// </summary> + array<SByte>^ ReadSBytes( ); + + /// <summary> + /// Read the given number of bytes from the stream. + /// </summary> + /// <param name="len">Number of bytes to read.</param> + array<Byte>^ ReadBytesOnly( System::UInt32 len ); + + void ReadBytesOnly( array<Byte> ^ buffer, int offset, int count ); + + /// <summary> + /// Read the given number of signed bytes from the stream. + /// </summary> + /// <param name="len">Number of signed bytes to read.</param> + array<SByte>^ ReadSBytesOnly( System::UInt32 len ); + + /// <summary> + /// Read a array len based on array size. + /// </summary> + int ReadArrayLen( ); + + /// <summary> + /// Read a 16-bit integer from the stream. + /// </summary> + System::Int16 ReadInt16( ); + + /// <summary> + /// Read a 32-bit integer from the stream. + /// </summary> + System::Int32 ReadInt32( ); + + /// <summary> + /// Read a 64-bit integer from the stream. + /// </summary> + System::Int64 ReadInt64( ); + + /// <summary> + /// Read a floating point number from the stream. + /// </summary> + float ReadFloat( ); + + /// <summary> + /// Read a double precision number from the stream. + /// </summary> + double ReadDouble( ); + + /// <summary> + /// Read a string after java-modified UTF-8 decoding from the stream. + /// The maximum length supported is 2^16-1 beyond which the string + /// shall be truncated. + /// </summary> + String^ ReadUTF( ); + + /// <summary> + /// Read a string after java-modified UTF-8 decoding from the stream. + /// </summary> + String^ ReadUTFHuge( ); + + /// <summary> + /// Read a ASCII string from the stream. Where size is more than 2^16-1 + /// </summary> + String^ ReadASCIIHuge( ); + + /// <summary> + /// Read a serializable object from the data. Null objects are handled. + /// </summary> + Object^ ReadObject( ); + + /// <summary> + /// Get the count of bytes that have been read from the stream. + /// </summary> + property System::UInt32 BytesRead + { + System::UInt32 get( ); + } + + /// <summary> + /// Get the count of bytes that are remaining in the buffer. + /// </summary> + property System::UInt32 BytesRemaining + { + System::UInt32 get(); + } + + /// <summary> + /// Advance the cursor of the buffer by the given offset. + /// </summary> + /// <param name="offset"> + /// The offset(number of bytes) by which to advance the cursor. + /// </param> + void AdvanceCursor( System::Int32 offset ); + + /// <summary> + /// Rewind the cursor of the buffer by the given offset. + /// </summary> + /// <param name="offset"> + /// The offset(number of bytes) by which to rewind the cursor. + /// </param> + void RewindCursor( System::Int32 offset ); + + /// <summary> + /// Reset the cursor to the start of buffer. + /// </summary> + void Reset(); + + /// <summary> + /// Read a dictionary from the stream in a given dictionary instance. + /// </summary> + /// <param name="dictionary">Object which implements System::Collections::IDictionary interface.</param> + void ReadDictionary(System::Collections::IDictionary^ dictionary); + + /// <summary> + /// Read a date from the stream. + /// </summary> + System::DateTime ReadDate( ); + + /// <summary> + /// Read a collection from the stream in a given collection instance. + /// </summary> + /// <param name="list">Object which implements System::Collections::IList interface.</param> + void ReadCollection(System::Collections::IList^ list); + + /// <summary> + /// Read a char array from the stream. + /// </summary> + array<Char>^ ReadCharArray( ); + + /// <summary> + /// Read a bool array from the stream. + /// </summary> + array<bool>^ ReadBooleanArray( ); + + /// <summary> + /// Read a short int array from the stream. + /// </summary> + array<Int16>^ ReadShortArray( ); + + /// <summary> + /// Read a int array from the stream. + /// </summary> + array<Int32>^ ReadIntArray(); + + /// <summary> + /// Read a long array from the stream. + /// </summary> + array<Int64>^ ReadLongArray(); + + /// <summary> + /// Read a float array from the stream. + /// </summary> + array<float>^ ReadFloatArray(); + + /// <summary> + /// Read a double array from the stream. + /// </summary> + array<double>^ ReadDoubleArray(); + + /// <summary> + /// Read a object array from the stream from the stream. + /// </summary> + List<Object^>^ ReadObjectArray(); + + /// <summary> + /// Read a array of signed byte array from the stream. + /// </summary> + array<array<Byte>^>^ ReadArrayOfByteArrays( ); + + internal: + + native::DataInput* GetNative() + { + return m_nativeptr->get(); + } + + void setPdxdeserialization(bool val) + { + m_ispdxDesrialization = true; + } + bool isRootObjectPdx() + { + return m_isRootObjectPdx; + } + void setRootObjectPdx(bool val) + { + m_isRootObjectPdx = val; + } + + Object^ readDotNetObjectArray(); + System::Collections::Generic::IDictionary<Object^, Object^>^ ReadDictionary(); + + String^ ReadString(); + + const char * GetPoolName() + { + try + { + return m_nativeptr->get()->getPoolName(); + } + finally { + GC::KeepAlive(m_nativeptr); + } + } + + Object^ ReadDotNetTypes(int8_t typeId); + + /// <summary> + /// Get the count of bytes that have been read from the stream, for internal use only. + /// </summary> + property System::UInt32 BytesReadInternally + { + System::UInt32 get( ); + } + + void ReadObject(bool% obj) + { + obj = ReadBoolean(); + } + + void ReadObject(Byte% obj) + { + obj = ReadByte(); + } + + void ReadObject(Char% obj) + { + obj = (Char)ReadUInt16(); + } + + inline Char decodeChar( ) + { + Char retChar; + int b = m_buffer[ m_cursor++ ] & 0xff; + int k = b >> 5; + switch ( k ) + { + default: + retChar = ( Char ) ( b & 0x7f ); + break; + case 6: + { + // two byte encoding + // 110yyyyy 10xxxxxx + // use low order 6 bits + int y = b & 0x1f; + // use low order 6 bits of the next byte + // It should have high order bits 10, which we don't check. + int x = m_buffer[ m_cursor++ ] & 0x3f; + // 00000yyy yyxxxxxx + retChar = ( Char ) ( y << 6 | x ); + break; + } + case 7: + { + // three byte encoding + // 1110zzzz 10yyyyyy 10xxxxxx + //assert ( b & 0x10 ) + // == 0 : "UTF8Decoder does not handle 32-bit characters"; + // use low order 4 bits + int z = b & 0x0f; + // use low order 6 bits of the next byte + // It should have high order bits 10, which we don't check. + int y = m_buffer[ m_cursor++ ] & 0x3f; + // use low order 6 bits of the next byte + // It should have high order bits 10, which we don't check. + int x = m_buffer[ m_cursor++ ] & 0x3f; + // zzzzyyyy yyxxxxxx + int asint = ( z << 12 | y << 6 | x ); + retChar = ( Char ) asint; + break; + } + }// end switch + + return retChar; + } + + System::Collections::Hashtable^ ReadHashtable() + { + int len = this->ReadArrayLen(); + + if(len == -1) + return nullptr; + else + { + System::Collections::Hashtable^ dict = gcnew System::Collections::Hashtable(); + for(int i =0; i< len; i++) + { + Object^ key = this->ReadObject(); + Object^ val = this->ReadObject(); + + dict->Add(key, val); + } + return dict; + } + } + + /// <summary> + /// Read a byte from the stream. + /// </summary> + Byte ReadByte( ); + + /// <summary> + /// Read a 16-bit unsigned integer from the stream. + /// </summary> + System::UInt16 ReadUInt16( ); + + /// <summary> + /// Read a 32-bit unsigned integer from the stream. + /// </summary> + System::UInt32 ReadUInt32( ); + + /// <summary> + /// Read a 64-bit unsigned integer from the stream. + /// </summary> + System::UInt64 ReadUInt64( ); + + void ReadObject(Double% obj) + { + obj = ReadDouble(); + } + + void ReadObject(Single% obj) + { + obj = ReadFloat(); + } + + void ReadObject(System::Int16% obj) + { + obj = ReadInt16(); + } + + void ReadObject(System::Int32% obj) + { + obj = ReadInt32(); + } + + void ReadObject(System::Int64% obj) + { + obj = ReadInt64(); + } + + void ReadObject(array<SByte>^% obj) + { + obj = ReadSBytes(); + } + + void DataInput::ReadObject(array<UInt16>^% obj); + void DataInput::ReadObject(array<UInt32>^% obj); + void DataInput::ReadObject(array<UInt64>^% obj); + + template <typename mType> + void ReadObject(array<mType>^ %objArray) + { + int arrayLen = ReadArrayLen(); + if(arrayLen >= 0) { + objArray = gcnew array<mType>(arrayLen); + + int i = 0; + for( i = 0; i < arrayLen; i++ ){ + mType tmp; + ReadObject(tmp); + objArray[i] = tmp; + } + } + } + + array<String^>^ ReadStringArray() + { + int len = this->ReadArrayLen(); + if ( len == -1) + { + return nullptr; + } + else + { + array<String^>^ ret = gcnew array<String^>(len); + if (len > 0) + { + for( int i = 0; i < len; i++) + { + Object^ obj = this->ReadObject(); + if(obj != nullptr) + ret[i] = static_cast<String^>(obj); + else + ret[i] = nullptr; + } + } + return ret; + } + } + + System::Byte* GetCursor() + { + return m_buffer + m_cursor; + } + + System::Byte* GetBytes(System::Byte* src, System::UInt32 size) + { + try + { + return m_nativeptr->get()->getBufferCopyFrom(src, size); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + + + void AdvanceUMCursor() + { + try { + m_nativeptr->get()->advanceCursor(m_cursor); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + m_cursor = 0; + m_bufferLength = 0; + } + + void AdvanceCursorPdx(int offset) + { + m_cursor += offset; + } + + void RewindCursorPdx(int rewind) + { + m_cursor = 0; + } + + void ResetAndAdvanceCursorPdx(int offset) + { + m_cursor = offset; + } + + void ResetPdx(int offset) + { + try + { + m_nativeptr->get()->reset(offset); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + SetBuffer(); + } + + inline array<Byte>^ ReadReverseBytesOnly(int len); + + void SetBuffer() + { + try + { + m_buffer = const_cast<System::Byte*> (m_nativeptr->get()->currentBufferPosition()); + m_cursor = 0; + m_bufferLength = m_nativeptr->get()->getBytesRemaining(); + } + finally + { + GC::KeepAlive(m_nativeptr); + } + } + + String^ DecodeBytes(int length) + { + //array<Char>^ output = gcnew array<Char>(length); + + if(m_forStringDecode->Length < length) + m_forStringDecode = gcnew array<Char>(length); + // index input[] + int i = 0; + // index output[] + int j = 0; + while ( i < length ) + { + // get next byte unsigned + //Byte b = m_buffer[ m_cursor++ ] & 0xff; + Byte b = ReadByte(); + i++; + Byte k = b >> 5; + // classify based on the high order 3 bits + switch ( k ) + { + default: + // one byte encoding + // 0xxxxxxx + // use just low order 7 bits + // 00000000 0xxxxxxx + m_forStringDecode[ j++ ] = ( Char ) ( b & 0x7f ); + break; + case 6: + { + // two byte encoding + // 110yyyyy 10xxxxxx + // use low order 6 bits + int y = b & 0x1f; + // use low order 6 bits of the next byte + // It should have high order bits 10, which we don't check. + int x = m_buffer[ m_cursor++ ] & 0x3f; + i++; + // 00000yyy yyxxxxxx + m_forStringDecode[ j++ ] = ( Char ) ( y << 6 | x ); + break; + } + case 7: + { + // three byte encoding + // 1110zzzz 10yyyyyy 10xxxxxx + //assert ( b & 0x10 ) + // == 0 : "UTF8Decoder does not handle 32-bit characters"; + // use low order 4 bits + int z = b & 0x0f; + // use low order 6 bits of the next byte + // It should have high order bits 10, which we don't check. + int y = m_buffer[ m_cursor++ ] & 0x3f; + i++; + // use low order 6 bits of the next byte + // It should have high order bits 10, which we don't check. + int x = m_buffer[ m_cursor++ ] & 0x3f; + i++; + // zzzzyyyy yyxxxxxx + int asint = ( z << 12 | y << 6 | x ); + m_forStringDecode[ j++ ] = ( Char ) asint; + break; + } + }// end switch + }// end while + + String^ str = gcnew String(m_forStringDecode, 0, j); + return str; + } + + void CheckBufferSize(int size); + + + Object^ ReadInternalGenericObject(); + + Object^ ReadInternalObject(); + + DataInput^ GetClone(); + + /// <summary> + /// Internal constructor to wrap a native object pointer + /// </summary> + /// <param name="nativeptr">The native object pointer</param> + inline DataInput( apache::geode::client::DataInput* nativeptr, bool managedObject, const native::Cache* cache ) + { + m_nativeptr = gcnew native_conditional_unique_ptr<native::DataInput>(nativeptr); + m_ispdxDesrialization = false; + m_isRootObjectPdx = false; + m_cache = cache; + m_cursor = 0; + m_isManagedObject = managedObject; + m_forStringDecode = gcnew array<Char>(100); + m_buffer = const_cast<System::Byte*>(nativeptr->currentBufferPosition()); + if ( m_buffer != NULL) { + m_bufferLength = nativeptr->getBytesRemaining(); + } + else { + m_bufferLength = 0; + } + } + + DataInput( System::Byte* buffer, int size, const native::Cache* cache ); + + bool IsManagedObject() + { + return m_isManagedObject; + } + + int GetPdxBytes() + { + return m_bufferLength; + } + + private: + + /// <summary> + /// Internal buffer managed by the class. + /// This is freed in the disposer/destructor. + /// </summary> + bool m_ispdxDesrialization; + bool m_isRootObjectPdx; + const native::Cache* m_cache; + System::Byte* m_buffer; + unsigned int m_bufferLength; + int m_cursor; + bool m_isManagedObject; + array<Char>^ m_forStringDecode; + + native_conditional_unique_ptr<native::DataInput>^ m_nativeptr; + + void Cleanup( ); + }; + } // namespace Client + } // namespace Geode +} // namespace Apache +