IGNITE-3575 CPP: Added support for continuous queries remote filters.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4da92b7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4da92b7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4da92b7 Branch: refs/heads/master Commit: d4da92b7ab2625c1e09d8ae06bf1eb9393162c8e Parents: 79bac4f Author: Igor Sapego <[email protected]> Authored: Fri Mar 31 16:40:25 2017 +0300 Committer: Igor Sapego <[email protected]> Committed: Fri Mar 31 16:40:25 2017 +0300 ---------------------------------------------------------------------- .../ignite/impl/binary/binary_type_impl.h | 2 +- .../common/include/ignite/common/concurrent.h | 30 +++ .../cpp/common/include/ignite/reference.h | 14 +- .../cpp/core-test/config/cache-test.xml | 2 +- .../project/vs/core-test.vcxproj.filters | 3 + .../cpp/core-test/src/cache_invoke_test.cpp | 6 +- .../platforms/cpp/core-test/src/cache_test.cpp | 23 ++- .../cpp/core-test/src/continuous_query_test.cpp | 202 ++++++++++++++++++- .../cpp/core-test/src/reference_test.cpp | 12 +- modules/platforms/cpp/core/Makefile.am | 1 + modules/platforms/cpp/core/include/Makefile.am | 70 ++++--- .../cpp/core/include/ignite/cache/cache.h | 22 +- .../ignite/cache/cache_entry_processor.h | 42 +--- .../cache/event/cache_entry_event_filter.h | 109 ++++++++++ .../cache/query/continuous/continuous_query.h | 35 +++- .../cpp/core/include/ignite/ignite_binding.h | 39 +++- .../include/ignite/ignite_binding_context.h | 2 +- .../cpp/core/include/ignite/impl/bindings.h | 95 +++++++++ .../impl/cache/cache_entry_processor_holder.h | 15 -- .../core/include/ignite/impl/cache/cache_impl.h | 81 +------- .../cache/event/cache_entry_event_filter_base.h | 66 ++++++ .../event/cache_entry_event_filter_holder.h | 185 +++++++++++++++++ .../continuous/continuous_query_handle_impl.h | 10 - .../query/continuous/continuous_query_impl.h | 60 +++++- .../include/ignite/impl/ignite_binding_impl.h | 101 +++++----- .../include/ignite/impl/ignite_environment.h | 37 ++-- .../cpp/core/include/ignite/impl/ignite_impl.h | 10 +- .../cpp/core/include/ignite/impl/operations.h | 2 +- .../platforms/cpp/core/project/vs/core.vcxproj | 5 + .../cpp/core/project/vs/core.vcxproj.filters | 18 ++ .../cpp/core/src/impl/cache/cache_impl.cpp | 90 ++++++++- .../continuous/continuous_query_handle_impl.cpp | 5 - .../cpp/core/src/impl/ignite_binding_impl.cpp | 88 ++++++++ .../cpp/core/src/impl/ignite_environment.cpp | 124 ++++++++++-- .../platforms/cpp/core/src/impl/ignite_impl.cpp | 2 +- 35 files changed, 1284 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h index d0cbb86..08c60c0 100644 --- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h +++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h @@ -103,7 +103,7 @@ namespace ignite ignite::binary::BinaryType<T> bt; ignite::Reference<ignite::binary::BinaryIdentityResolver> resolver = bt.GetIdentityResolver(); - return resolver.Get().GetHashCode(obj); + return resolver.Get()->GetHashCode(obj); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/common/include/ignite/common/concurrent.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h index 84a1f0e..69b8eda 100644 --- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h +++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h @@ -30,6 +30,11 @@ namespace ignite namespace concurrent { /** + * Type tag for static pointer cast. + */ + struct StaticTag {}; + + /** * Default deleter implementation. * * @param obj Object to be deleted. @@ -198,6 +203,20 @@ namespace ignite } /** + * Static-cast constructor. + * + * @param other Instance to copy. + */ + template<typename T2> + SharedPointer(const SharedPointer<T2>& other, StaticTag) : + ptr(static_cast<T*>(other.ptr)), + impl(other.impl) + { + if (impl) + impl->Increment(); + } + + /** * Assignment operator. * * @param other Other instance. @@ -313,6 +332,17 @@ namespace ignite }; /** + * Enables static-cast semantics for SharedPointer. + * + * @param val Value to cast. + */ + template<class T1, class T2> + SharedPointer<T1> StaticPointerCast(const SharedPointer<T2>& val) + { + return SharedPointer<T1>(val, StaticTag()); + } + + /** * The class provides functionality that allows objects of derived * classes to create instances of shared_ptr pointing to themselves * and sharing ownership with existing shared_ptr objects. http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/common/include/ignite/reference.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/reference.h b/modules/platforms/cpp/common/include/ignite/reference.h index b026ad7..08cccec 100644 --- a/modules/platforms/cpp/common/include/ignite/reference.h +++ b/modules/platforms/cpp/common/include/ignite/reference.h @@ -160,9 +160,9 @@ namespace ignite * * @return Constant reference to underlying value. */ - const T& Get() const + const T* Get() const { - return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset); + return reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset); } /** @@ -326,11 +326,11 @@ namespace ignite * If the pointer is null then this operation causes undefined * behaviour. * - * @return Constant reference to underlying value. + * @return Constant pointer to underlying value. */ - const T& Get() const + const T* Get() const { - return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset); + return reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset); } /** @@ -341,9 +341,9 @@ namespace ignite * * @return Reference to underlying value. */ - T& Get() + T* Get() { - return *reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset); + return reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/config/cache-test.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/config/cache-test.xml b/modules/platforms/cpp/core-test/config/cache-test.xml index 0ea5876..10300ba 100644 --- a/modules/platforms/cpp/core-test/config/cache-test.xml +++ b/modules/platforms/cpp/core-test/config/cache-test.xml @@ -55,7 +55,7 @@ <property name="cacheMode" value="PARTITIONED"/> <property name="atomicityMode" value="TRANSACTIONAL"/> </bean> - + <bean parent="cache-template"> <property name="name" value="partitioned2"/> <property name="cacheMode" value="PARTITIONED"/> http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters index fb0be1b..5181f96 100644 --- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters +++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters @@ -142,5 +142,8 @@ <None Include="..\..\config\cache-store.xml"> <Filter>Configs</Filter> </None> + <None Include="..\..\config\cache-query-continuous.xml"> + <Filter>Configs</Filter> + </None> </ItemGroup> </Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp b/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp index db304e2..4f1f30a 100644 --- a/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp +++ b/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp @@ -44,7 +44,7 @@ using namespace ignite::common; /** * CacheEntryModifier class for invoke tests. */ -class CacheEntryModifier : public CacheEntryProcessor<CacheEntryModifier, int, int, int, int> +class CacheEntryModifier : public CacheEntryProcessor<int, int, int, int> { public: /** @@ -151,7 +151,7 @@ namespace ignite /** * Divisor class for invoke tests. */ -class Divisor : public CacheEntryProcessor<Divisor, int, int, double, double> +class Divisor : public CacheEntryProcessor<int, int, double, double> { public: /** @@ -262,7 +262,7 @@ namespace ignite /** * Character remover class for invoke tests. */ -class CharRemover : public CacheEntryProcessor<CharRemover, std::string, std::string, int, bool> +class CharRemover : public CacheEntryProcessor<std::string, std::string, int, bool> { public: /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/cache_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/cache_test.cpp b/modules/platforms/cpp/core-test/src/cache_test.cpp index 437ed234..d57b757 100644 --- a/modules/platforms/cpp/core-test/src/cache_test.cpp +++ b/modules/platforms/cpp/core-test/src/cache_test.cpp @@ -29,16 +29,6 @@ using namespace ignite; using namespace boost::unit_test; -/* Nodes started during the test. */ -Ignite grid0 = Ignite(); -Ignite grid1 = Ignite(); - -/** Cache accessor. */ -cache::Cache<int, int> Cache() -{ - return grid0.GetCache<int, int>("partitioned"); -} - struct Person { std::string name; @@ -88,7 +78,18 @@ namespace ignite /* * Test setup fixture. */ -struct CacheTestSuiteFixture { +struct CacheTestSuiteFixture +{ + /* Nodes started during the test. */ + Ignite grid0; + Ignite grid1; + + /** Cache accessor. */ + cache::Cache<int, int> Cache() + { + return grid0.GetCache<int, int>("partitioned"); + } + /* * Constructor. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/continuous_query_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp index 1be21c1..f81eb5d 100644 --- a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp +++ b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp @@ -175,6 +175,61 @@ private: ConcurrentQueue< CacheEntryEvent<K, V> > eventQueue; }; +/** + * Only lets through keys from the range. + */ +template<typename K, typename V> +struct RangeFilter : CacheEntryEventFilter<K, V> +{ + /** + * Default constructor. + */ + RangeFilter() : + rangeBegin(0), + rangeEnd(0) + { + // No-op. + } + + /** + * Constructor. + * + * @param from Range beginning. Inclusive. + * @param to Range end. Not inclusive. + */ + RangeFilter(const K& from, const K& to) : + rangeBegin(from), + rangeEnd(to) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~RangeFilter() + { + // No-op. + } + + /** + * Event callback. + * + * @param event Event. + * @return True if the event passes filter. + */ + virtual bool Process(const CacheEntryEvent<K, V>& event) + { + return event.GetKey() >= rangeBegin && event.GetKey() < rangeEnd; + } + + /** Beginning of the range. */ + K rangeBegin; + + /** End of the range. */ + K rangeEnd; +}; + /* * Test entry. */ @@ -204,10 +259,9 @@ namespace ignite { namespace binary { - /** - * Binary type definition. - */ - IGNITE_BINARY_TYPE_START(TestEntry) + template<> + struct BinaryType<TestEntry> + { IGNITE_BINARY_GET_TYPE_ID_AS_HASH(TestEntry) IGNITE_BINARY_GET_TYPE_NAME_AS_IS(TestEntry) IGNITE_BINARY_GET_FIELD_ID_AS_HASH @@ -227,8 +281,52 @@ namespace ignite return res; } + }; - IGNITE_BINARY_TYPE_END + template<typename K, typename V> + struct BinaryType< RangeFilter<K,V> > + { + int32_t GetTypeId() + { + return GetBinaryStringHashCode("RangeFilter"); + } + + std::string GetTypeName() + { + return "RangeFilter"; + + } + IGNITE_BINARY_GET_FIELD_ID_AS_HASH + + int32_t GetHashCode(const RangeFilter<K,V>&) + { + return 0; + } + + bool IsNull(const RangeFilter<K,V>&) + { + return false; + } + + RangeFilter<K,V> GetNull() + { + return RangeFilter<K,V>(); + } + + void Write(BinaryWriter& writer, const RangeFilter<K,V>& obj) + { + writer.WriteObject("rangeBegin", obj.rangeBegin); + writer.WriteObject("rangeEnd", obj.rangeEnd); + } + + RangeFilter<K,V> Read(BinaryReader& reader) + { + K begin = reader.ReadObject<K>("rangeBegin"); + K end = reader.ReadObject<K>("rangeEnd"); + + return RangeFilter<K,V>(begin, end); + } + }; } } @@ -237,7 +335,7 @@ namespace ignite */ struct ContinuousQueryTestSuiteFixture { - Ignite grid; + Ignite node; Cache<int, TestEntry> cache; @@ -245,8 +343,8 @@ struct ContinuousQueryTestSuiteFixture * Constructor. */ ContinuousQueryTestSuiteFixture() : - grid(ignite_test::StartNode("cache-query-continuous.xml", "node-01")), - cache(grid.GetCache<int, TestEntry>("transactional_no_backup")) + node(ignite_test::StartNode("cache-query-continuous.xml", "node-01")), + cache(node.GetCache<int, TestEntry>("transactional_no_backup")) { // No-op. } @@ -258,7 +356,7 @@ struct ContinuousQueryTestSuiteFixture { Ignition::StopAll(false); - grid = Ignite(); + node = Ignite(); } }; @@ -581,4 +679,90 @@ BOOST_AUTO_TEST_CASE(TestPublicPrivateConstantsConsistence) static_cast<int>(QueryType::DEFAULT_BUFFER_SIZE)); } +BOOST_AUTO_TEST_CASE(TestFilterSingleNode) +{ + node.GetBinding().RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >(); + + Listener<int, TestEntry> lsnr; + RangeFilter<int, TestEntry> filter(100, 150); + + ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr), MakeReference(filter)); + + ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry); + + cache.Put(1, TestEntry(10)); + cache.Put(1, TestEntry(11)); + + cache.Put(2, TestEntry(20)); + cache.Remove(2); + + cache.Put(100, TestEntry(1000)); + cache.Put(101, TestEntry(1010)); + + cache.Put(142, TestEntry(1420)); + cache.Put(142, TestEntry(1421)); + cache.Remove(142); + + cache.Put(149, TestEntry(1490)); + cache.Put(150, TestEntry(1500)); + cache.Put(150, TestEntry(1502)); + cache.Remove(150); + + lsnr.CheckNextEvent(100, boost::none, TestEntry(1000)); + lsnr.CheckNextEvent(101, boost::none, TestEntry(1010)); + + lsnr.CheckNextEvent(142, boost::none, TestEntry(1420)); + lsnr.CheckNextEvent(142, TestEntry(1420), TestEntry(1421)); + lsnr.CheckNextEvent(142, TestEntry(1421), boost::none); + + lsnr.CheckNextEvent(149, boost::none, TestEntry(1490)); +} + +BOOST_AUTO_TEST_CASE(TestFilterMultipleNodes) +{ + Ignite node2 = ignite_test::StartNode("cache-query-continuous.xml", "node-02"); + Ignite node3 = ignite_test::StartNode("cache-query-continuous.xml", "node-03"); + + node.GetBinding().RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >(); + + Listener<int, TestEntry> lsnr; + RangeFilter<int, TestEntry> filter(100, 150); + + ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr), MakeReference(filter)); + + ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry); + + Cache<int, TestEntry> cache2 = node2.GetCache<int, TestEntry>("transactional_no_backup"); + + cache2.Put(1, TestEntry(10)); + cache2.Put(1, TestEntry(11)); + + cache2.Put(2, TestEntry(20)); + cache2.Remove(2); + + cache2.Put(100, TestEntry(1000)); + cache2.Put(101, TestEntry(1010)); + + cache2.Put(142, TestEntry(1420)); + cache2.Put(142, TestEntry(1421)); + cache2.Remove(142); + + cache2.Put(149, TestEntry(1490)); + cache2.Put(150, TestEntry(1500)); + cache2.Put(150, TestEntry(1502)); + cache2.Remove(150); + + for (int i = 200; i < 250; ++i) + cache2.Put(i, TestEntry(i * 10)); + + lsnr.CheckNextEvent(100, boost::none, TestEntry(1000)); + lsnr.CheckNextEvent(101, boost::none, TestEntry(1010)); + + lsnr.CheckNextEvent(142, boost::none, TestEntry(1420)); + lsnr.CheckNextEvent(142, TestEntry(1420), TestEntry(1421)); + lsnr.CheckNextEvent(142, TestEntry(1421), boost::none); + + lsnr.CheckNextEvent(149, boost::none, TestEntry(1490)); +} + BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/reference_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/reference_test.cpp b/modules/platforms/cpp/core-test/src/reference_test.cpp index a5ac559..ec445c7 100644 --- a/modules/platforms/cpp/core-test/src/reference_test.cpp +++ b/modules/platforms/cpp/core-test/src/reference_test.cpp @@ -118,32 +118,32 @@ struct C3 : C1, C2 void TestFunction1(Reference<C1> c1, int expected) { - BOOST_CHECK_EQUAL(c1.Get().c1, expected); + BOOST_CHECK_EQUAL(c1.Get()->c1, expected); } void TestFunction2(Reference<C2> c2, int expected) { - BOOST_CHECK_EQUAL(c2.Get().c2, expected); + BOOST_CHECK_EQUAL(c2.Get()->c2, expected); } void TestFunction3(Reference<C3> c3, int expected) { - BOOST_CHECK_EQUAL(c3.Get().c3, expected); + BOOST_CHECK_EQUAL(c3.Get()->c3, expected); } void TestFunctionConst1(ConstReference<C1> c1, int expected) { - BOOST_CHECK_EQUAL(c1.Get().c1, expected); + BOOST_CHECK_EQUAL(c1.Get()->c1, expected); } void TestFunctionConst2(ConstReference<C2> c2, int expected) { - BOOST_CHECK_EQUAL(c2.Get().c2, expected); + BOOST_CHECK_EQUAL(c2.Get()->c2, expected); } void TestFunctionConst3(ConstReference<C3> c3, int expected) { - BOOST_CHECK_EQUAL(c3.Get().c3, expected); + BOOST_CHECK_EQUAL(c3.Get()->c3, expected); } BOOST_AUTO_TEST_SUITE(ReferenceTestSuite) http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/Makefile.am b/modules/platforms/cpp/core/Makefile.am index 46d6bc9..4de45d3 100644 --- a/modules/platforms/cpp/core/Makefile.am +++ b/modules/platforms/cpp/core/Makefile.am @@ -69,6 +69,7 @@ libignite_la_SOURCES = \ src/impl/transactions/transactions_impl.cpp \ src/impl/cluster/cluster_group_impl.cpp \ src/impl/ignite_impl.cpp \ + src/impl/ignite_binding_impl.cpp \ src/transactions/transaction.cpp \ src/transactions/transactions.cpp http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/Makefile.am b/modules/platforms/cpp/core/include/Makefile.am index 21d3062..0e9a7ec 100644 --- a/modules/platforms/cpp/core/include/Makefile.am +++ b/modules/platforms/cpp/core/include/Makefile.am @@ -18,45 +18,55 @@ ACLOCAL_AMFLAGS =-I m4 nobase_include_HEADERS = \ - ignite/ignite_configuration.h \ - ignite/ignite.h \ - ignite/impl/binary/binary_type_updater_impl.h \ - ignite/impl/operations.h \ - ignite/impl/ignite_environment.h \ - ignite/impl/ignite_impl.h \ - ignite/impl/cache/query/query_fields_row_impl.h \ - ignite/impl/cache/query/query_argument.h \ - ignite/impl/cache/query/query_impl.h \ - ignite/impl/cache/cache_impl.h \ - ignite/impl/cache/cache_entry_processor_holder.h \ - ignite/impl/cache/query/query_batch.h \ - ignite/impl/interop/interop_target.h \ - ignite/impl/interop/interop_external_memory.h \ - ignite/impl/handle_registry.h \ - ignite/impl/transactions/transaction_impl.h \ - ignite/impl/transactions/transactions_impl.h \ - ignite/impl/cluster/cluster_group_impl.h \ - ignite/impl/ignite_binding_impl.h \ - ignite/impl/module_manager.h \ - ignite/cache/query/query_fields_row.h \ + ignite/cache/cache.h \ + ignite/cache/cache_entry.h \ + ignite/cache/cache_entry_processor.h \ + ignite/cache/cache_peek_mode.h \ + ignite/cache/event/cache_entry_event.h \ + ignite/cache/event/cache_entry_event_filter.h \ + ignite/cache/event/cache_entry_event_listener.h \ + ignite/cache/mutable_cache_entry.h \ + ignite/cache/query/continuous/continuous_query.h \ + ignite/cache/query/continuous/continuous_query_handle.h \ + ignite/cache/query/query.h \ + ignite/cache/query/query_cursor.h \ ignite/cache/query/query_fields_cursor.h \ + ignite/cache/query/query_fields_row.h \ ignite/cache/query/query_scan.h \ - ignite/cache/query/query_cursor.h \ ignite/cache/query/query_sql.h \ - ignite/cache/query/query.h \ ignite/cache/query/query_sql_fields.h \ ignite/cache/query/query_text.h \ - ignite/cache/cache.h \ - ignite/cache/cache_entry.h \ - ignite/cache/cache_peek_mode.h \ - ignite/cache/cache_entry_processor.h \ - ignite/cache/mutable_cache_entry.h \ - ignite/ignition.h \ + ignite/ignite.h \ ignite/ignite_binding.h \ ignite/ignite_binding_context.h \ + ignite/ignite_configuration.h \ + ignite/ignition.h \ + ignite/impl/binary/binary_type_updater_impl.h \ + ignite/impl/bindings.h \ + ignite/impl/cache/cache_entry_processor_holder.h \ + ignite/impl/cache/cache_impl.h \ + ignite/impl/cache/event/cache_entry_event_filter_base.h \ + ignite/impl/cache/event/cache_entry_event_filter_holder.h \ + ignite/impl/cache/query/continuous/continuous_query_handle_impl.h \ + ignite/impl/cache/query/continuous/continuous_query_impl.h \ + ignite/impl/cache/query/query_argument.h \ + ignite/impl/cache/query/query_batch.h \ + ignite/impl/cache/query/query_fields_row_impl.h \ + ignite/impl/cache/query/query_impl.h \ + ignite/impl/cluster/cluster_group_impl.h \ + ignite/impl/handle_registry.h \ + ignite/impl/ignite_binding_impl.h \ + ignite/impl/ignite_environment.h \ + ignite/impl/ignite_impl.h \ + ignite/impl/interop/interop_external_memory.h \ + ignite/impl/interop/interop_target.h \ + ignite/impl/module_manager.h \ + ignite/impl/operations.h \ + ignite/impl/transactions/transactions_impl.h \ + ignite/impl/transactions/transaction_impl.h \ ignite/transactions/transaction.h \ - ignite/transactions/transaction_consts.h \ ignite/transactions/transactions.h \ + ignite/transactions/transaction_consts.h \ ignite/transactions/transaction_metrics.h uninstall-hook: http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/cache.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h index f9c442c..00d1c81 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/cache.h +++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h @@ -1485,8 +1485,11 @@ namespace ignite const query::continuous::ContinuousQuery<K, V>& qry, IgniteError& err) { using namespace impl::cache::query::continuous; + using namespace common::concurrent; - if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener()) + const SharedPointer<ContinuousQueryImpl<K, V> >& qryImpl = qry.impl; + + if (!qryImpl.IsValid() || !qryImpl.Get()->HasListener()) { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Event listener is not set for ContinuousQuery instance"); @@ -1494,11 +1497,7 @@ namespace ignite return query::continuous::ContinuousQueryHandle<K, V>(); } - ContinuousQueryHandleImpl* cqImpl; - cqImpl = impl.Get()->QueryContinuous(qry.impl, err); - - if (cqImpl) - cqImpl->SetQuery(qry.impl); + ContinuousQueryHandleImpl* cqImpl = impl.Get()->QueryContinuous(qryImpl, err); return query::continuous::ContinuousQueryHandle<K, V>(cqImpl); } @@ -1538,8 +1537,11 @@ namespace ignite const Q& initialQry, IgniteError& err) { using namespace impl::cache::query::continuous; + using namespace common::concurrent; - if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener()) + const SharedPointer<ContinuousQueryImpl<K, V> >& qryImpl = qry.impl; + + if (!qryImpl.IsValid() || !qryImpl.Get()->HasListener()) { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Event listener is not set for ContinuousQuery instance"); @@ -1547,11 +1549,7 @@ namespace ignite return query::continuous::ContinuousQueryHandle<K, V>(); } - ContinuousQueryHandleImpl* cqImpl; - cqImpl = impl.Get()->QueryContinuous(qry.impl, initialQry, err); - - if (cqImpl) - cqImpl->SetQuery(qry.impl); + ContinuousQueryHandleImpl* cqImpl = impl.Get()->QueryContinuous(qryImpl, initialQry, err); return query::continuous::ContinuousQueryHandle<K, V>(cqImpl); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h b/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h index 7fa1550..e0bb694 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h +++ b/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h @@ -42,17 +42,21 @@ namespace ignite * All templated types should be default-constructable, * copy-constructable and assignable. * - * @tparam P The processor itself which inherits from CacheEntryProcessor. * @tparam K Key type. * @tparam V Value type. * @tparam R Process method return type. * @tparam A Process method argument type. */ - template<typename P, typename K, typename V, typename R, typename A> + template<typename K, typename V, typename R, typename A> class CacheEntryProcessor { friend class ignite::IgniteBinding; + typedef A ArgumentType; + typedef K KeyType; + typedef V ValueType; + typedef R ReturnType; + public: /** * Destructor. @@ -70,40 +74,6 @@ namespace ignite * @return Processing result. */ virtual R Process(MutableCacheEntry<K, V>& entry, const A& arg) = 0; - - private: - /** - * Process input streaming data to produce output streaming data. - * - * Deserializes cache entry and processor using provided reader, invokes - * cache entry processor, gets result and serializes it using provided - * writer. - * - * @param reader Reader. - * @param writer Writer. - */ - static void InternalProcess(impl::binary::BinaryReaderImpl& reader, impl::binary::BinaryWriterImpl& writer) - { - typedef impl::cache::CacheEntryProcessorHolder<P, A> ProcessorHolder; - - ProcessorHolder procHolder = reader.ReadObject<ProcessorHolder>(); - - K key = reader.ReadObject<K>(); - - V value; - bool exists = reader.TryReadObject<V>(value); - - impl::cache::MutableCacheEntryState entryState; - - R res = procHolder.template Process<R, K, V>(key, value, exists, entryState); - - writer.WriteInt8(static_cast<int8_t>(entryState)); - - if (entryState == impl::cache::ENTRY_STATE_VALUE_SET) - writer.WriteTopObject(value); - - writer.WriteTopObject(res); - } }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h new file mode 100644 index 0000000..3a4fc74 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::cache::event::CacheEntryEventFilter class. + */ + +#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER +#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER + +#include <ignite/cache/event/cache_entry_event.h> +#include <ignite/impl/cache/event/cache_entry_event_filter_base.h> + +namespace ignite +{ + class IgniteBinding; + + namespace impl + { + namespace cache + { + namespace event + { + template<typename T> + class CacheEntryEventFilterHolder; + } + } + } + + namespace cache + { + namespace event + { + /** + * Cache entry event filter. + * + * All templated types should be default-constructable, + * copy-constructable and assignable. + * + * @tparam K Key type. + * @tparam V Value type. + */ + template<typename K, typename V> + class CacheEntryEventFilter : private impl::cache::event::CacheEntryEventFilterBase + { + template<typename T> + friend class impl::cache::event::CacheEntryEventFilterHolder; + + public: + /** + * Default constructor. + */ + CacheEntryEventFilter() + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntryEventFilter() + { + // No-op. + } + + /** + * Event callback. + * + * @param event Event. + * @return True if the event passes filter. + */ + virtual bool Process(const CacheEntryEvent<K, V>& event) = 0; + + private: + /** + * Process serialized events. + * + * @param reader Reader for a serialized event. + * @return Filter evaluation result. + */ + virtual bool ReadAndProcessEvent(binary::BinaryRawReader& reader) + { + CacheEntryEvent<K, V> event; + + event.Read(reader); + + return Process(event); + } + }; + } + } +} + +#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h index 82bb125..0c1146b 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h +++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h @@ -24,7 +24,9 @@ #define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY #include <ignite/impl/cache/query/continuous/continuous_query_impl.h> + #include <ignite/cache/event/cache_entry_event_listener.h> +#include <ignite/cache/event/cache_entry_event_filter.h> namespace ignite { @@ -83,7 +85,7 @@ namespace ignite * continuous query execution has been started. */ ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr) : - impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr)) + impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, false)) { // No-op. } @@ -102,6 +104,37 @@ namespace ignite } /** + * Constructor. + * + * @param lsnr Event listener. Invoked on the node where + * continuous query execution has been started. + * @param remoteFilter Remote filter. + */ + template<typename F> + ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr, + const Reference<F>& remoteFilter) : + impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, false, remoteFilter)) + { + // No-op. + } + + /** + * Constructor. + * + * @param lsnr Event listener Invoked on the node where + * continuous query execution has been started. + * @param remoteFilter Remote filter. + * @param loc Whether query should be executed locally. + */ + template<typename F> + ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr, + const Reference<F>& remoteFilter, bool loc) : + impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, loc, remoteFilter)) + { + // No-op. + } + + /** * Set local flag. * * @param val Value of the flag. If true, query will be http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/ignite_binding.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/ignite_binding.h b/modules/platforms/cpp/core/include/ignite/ignite_binding.h index a8decf9..a84a1c1 100644 --- a/modules/platforms/cpp/core/include/ignite/ignite_binding.h +++ b/modules/platforms/cpp/core/include/ignite/ignite_binding.h @@ -22,6 +22,7 @@ #include <ignite/common/concurrent.h> #include <ignite/impl/ignite_binding_impl.h> +#include <ignite/impl/bindings.h> namespace ignite { @@ -53,12 +54,10 @@ namespace ignite } /** - * Register Type as Cache Entry Processor. + * Register type as Cache Entry Processor. * * Registred type should be a child of ignite::cache::CacheEntryProcessor * class. - * - * This method should only be used on the valid instance. */ template<typename P> void RegisterCacheEntryProcessor() @@ -76,8 +75,6 @@ namespace ignite * Registred type should be a child of ignite::cache::CacheEntryProcessor * class. * - * This method should only be used on the valid instance. - * * @param err Error. */ template<typename P> @@ -87,7 +84,11 @@ namespace ignite impl::IgniteBindingImpl *im = impl.Get(); if (im) - im->RegisterCallback(bt.GetTypeId(), &P::CacheEntryProcessor::InternalProcess, err); + { + im->RegisterCallback(impl::IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY, + bt.GetTypeId(), impl::binding::ListenerApply<P, typename P::KeyType, + typename P::ValueType, typename P::ReturnType, typename P::ArgumentType>, err); + } else { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, @@ -96,6 +97,32 @@ namespace ignite } /** + * Register type as Cache Entry Event Filter. + * + * Registred type should be a child of ignite::cache::event::CacheEntryEventFilter + * class. + */ + template<typename F> + void RegisterCacheEntryEventFilter() + { + binary::BinaryType<F> bt; + impl::IgniteBindingImpl *im = impl.Get(); + + int32_t typeId = bt.GetTypeId(); + + if (im) + { + im->RegisterCallback(impl::IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE, + typeId, impl::binding::FilterCreate<F>); + } + else + { + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Instance is not usable (did you check for error?)."); + } + } + + /** * Check if the instance is valid. * * Invalid instance can be returned if some of the previous operations http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h b/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h index 1a6d26d..4d8a7a7 100644 --- a/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h +++ b/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h @@ -70,7 +70,7 @@ namespace ignite * @param cfg Configuration. * @param binding Binding. */ - IgniteBindingContext(const IgniteConfiguration& cfg, IgniteBinding binding) : + IgniteBindingContext(const IgniteConfiguration& cfg, const IgniteBinding& binding) : cfg(cfg), binding(binding) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/bindings.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/bindings.h b/modules/platforms/cpp/core/include/ignite/impl/bindings.h new file mode 100644 index 0000000..ce77672 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/bindings.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_BINDINGS +#define _IGNITE_IMPL_BINDINGS + +#include <stdint.h> + +#include <ignite/impl/binary/binary_reader_impl.h> +#include <ignite/impl/ignite_environment.h> +#include <ignite/impl/cache/query/continuous/continuous_query_impl.h> +#include <ignite/impl/cache/cache_entry_processor_holder.h> + +namespace ignite +{ + namespace impl + { + namespace binding + { + /** + * Binding for filter creation. + * + * @tparam F The filter which inherits from CacheEntryEventFilter. + * + * @param reader Reader. + * @param env Environment. + * @return Handle for the filter. + */ + template<typename F> + int64_t FilterCreate(binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl&, IgniteEnvironment& env) + { + using namespace common::concurrent; + using namespace cache::query::continuous; + + F filter = reader.ReadObject<F>(); + + SharedPointer<ContinuousQueryImplBase> qry(new RemoteFilterHolder(MakeReferenceFromCopy(filter))); + + return env.GetHandleRegistry().Allocate(qry); + } + + /** + * Process input streaming data to produce output streaming data. + * + * Deserializes cache entry and processor using provided reader, invokes + * cache entry processor, gets result and serializes it using provided + * writer. + * + * @param reader Reader. + * @param writer Writer. + */ + template<typename P, typename K, typename V, typename R, typename A> + int64_t ListenerApply(binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer, IgniteEnvironment&) + { + typedef cache::CacheEntryProcessorHolder<P, A> ProcessorHolder; + + ProcessorHolder procHolder = reader.ReadObject<ProcessorHolder>(); + + K key = reader.ReadObject<K>(); + + V value; + bool exists = reader.TryReadObject<V>(value); + + cache::MutableCacheEntryState entryState; + + R res = procHolder.template Process<R, K, V>(key, value, exists, entryState); + + writer.WriteInt8(static_cast<int8_t>(entryState)); + + if (entryState == cache::ENTRY_STATE_VALUE_SET) + writer.WriteTopObject(value); + + writer.WriteTopObject(res); + + return 0; + } + } + } +} + +#endif //_IGNITE_IMPL_BINDINGS http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h index 23b57c3..c979b4a 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h @@ -75,20 +75,6 @@ namespace ignite /** * Holder for the Cache Entry Processor and its argument. Used as a convenient way to * transmit Cache Entry Processor between nodes. - * - * Both key and value types should be default-constructable, - * copy-constructable and assignable. - * - * Additionally, for the processor class public methods with the - * following signatures should be defined: - * @code{.cpp} - * // Should return unique ID for every class. - * static int64_t GetJobId(); - * - * // Main processing method. Takes cache entry and argument and - * // returns processing result. - * R Process(ignite::cache::MutableCacheEntry<K, V>&, const A&); - * @endcode */ template<typename P, typename A> class CacheEntryProcessorHolder @@ -202,7 +188,6 @@ namespace ignite typedef impl::cache::CacheEntryProcessorHolder<P, A> UnderlyingType; IGNITE_BINARY_GET_FIELD_ID_AS_HASH - IGNITE_BINARY_GET_HASH_CODE_ZERO(UnderlyingType) IGNITE_BINARY_IS_NULL_FALSE(UnderlyingType) IGNITE_BINARY_GET_NULL_DEFAULT_CTOR(UnderlyingType) http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h index e6cfbab..4599522 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h @@ -22,9 +22,7 @@ #include <ignite/cache/query/query_sql.h> #include <ignite/cache/query/query_text.h> #include <ignite/cache/query/query_sql_fields.h> -#include <ignite/cache/query/continuous/continuous_query_handle.h> #include <ignite/impl/cache/query/query_impl.h> -#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h> #include <ignite/impl/cache/query/continuous/continuous_query_impl.h> #include <ignite/impl/interop/interop_target.h> @@ -35,6 +33,15 @@ namespace ignite { namespace cache { + namespace query + { + namespace continuous + { + /* Forward declaration. */ + class ContinuousQueryHandleImpl; + } + } + /** * Cache implementation. */ @@ -402,30 +409,7 @@ namespace ignite * @param err Error. */ template<typename T> - query::QueryCursorImpl* QueryInternal(const T& qry, int32_t typ, IgniteError& err) - { - ignite::jni::java::JniErrorInfo jniErr; - - ignite::common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory(); - interop::InteropMemory* mem0 = mem.Get(); - interop::InteropOutputStream out(mem0); - binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); - ignite::binary::BinaryRawWriter rawWriter(&writer); - - qry.Write(rawWriter); - - out.Synchronize(); - - jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(), - typ, mem.Get()->PointerLong(), &jniErr); - - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); - - if (jniErr.code == ignite::java::IGNITE_JNI_ERR_SUCCESS) - return new query::QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef); - else - return 0; - } + query::QueryCursorImpl* QueryInternal(const T& qry, int32_t typ, IgniteError& err); /** * Start continuous query execution with the initial query. @@ -438,50 +422,7 @@ namespace ignite template<typename T> query::continuous::ContinuousQueryHandleImpl* QueryContinuous( const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry, - const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err) - { - jni::java::JniErrorInfo jniErr; - - common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory(); - interop::InteropMemory* mem0 = mem.Get(); - interop::InteropOutputStream out(mem0); - binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); - ignite::binary::BinaryRawWriter rawWriter(&writer); - - const query::continuous::ContinuousQueryImplBase& qry0 = *qry.Get(); - - int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry); - - rawWriter.WriteInt64(handle); - rawWriter.WriteBool(qry0.GetLocal()); - - // Filters are not supported for now. - rawWriter.WriteBool(false); - rawWriter.WriteNull(); - - rawWriter.WriteInt32(qry0.GetBufferSize()); - rawWriter.WriteInt64(qry0.GetTimeInterval()); - - // Autounsubscribe is a filter feature. - rawWriter.WriteBool(false); - - // Writing initial query. When there is not initial query writing -1. - rawWriter.WriteInt32(typ); - if (typ != -1) - initialQry.Write(rawWriter); - - out.Synchronize(); - - jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(), - cmd, mem.Get()->PointerLong(), &jniErr); - - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); - - if (jniErr.code == java::IGNITE_JNI_ERR_SUCCESS) - return new query::continuous::ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef); - - return 0; - } + const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err); }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h new file mode 100644 index 0000000..a0e1cb6 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE +#define _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE + +#include <ignite/binary/binary_raw_reader.h> + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace event + { + /** + * Base for the Cache Entry Event Filter. + */ + class CacheEntryEventFilterBase + { + public: + /** + * Default constructor. + */ + CacheEntryEventFilterBase() + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntryEventFilterBase() + { + // No-op. + } + + /** + * Process serialized events. + * + * @param reader Reader for a serialized event. + * @return Filter evaluation result. + */ + virtual bool ReadAndProcessEvent(ignite::binary::BinaryRawReader& reader) = 0; + }; + } + } + } +} + +#endif //_IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h new file mode 100644 index 0000000..4256f2b --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER +#define _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER + +#include <ignite/reference.h> + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace event + { + /* Forward declaration. */ + class CacheEntryEventFilterBase; + + class CacheEntryEventFilterHolderBase + { + public: + /** + * Destructor. + */ + virtual ~CacheEntryEventFilterHolderBase() + { + // No-op. + } + + /** + * Write. + * + * @param writer Writer. + */ + virtual void Write(binary::BinaryWriterImpl& writer) = 0; + + /** + * Get filter pointer. + * + * @return Filter. + */ + virtual CacheEntryEventFilterBase* GetFilter() = 0; + }; + + /** + * Holder for the Cache Entry Event Filter. + */ + template<typename F> + class CacheEntryEventFilterHolder : public CacheEntryEventFilterHolderBase + { + public: + typedef F FilterType; + + /** + * Default constructor. + */ + CacheEntryEventFilterHolder() : + filter() + { + // No-op. + } + + /** + * Constructor. + * + * @param filter Filter. + */ + CacheEntryEventFilterHolder(const Reference<FilterType>& filter) : + filter(filter) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntryEventFilterHolder() + { + // No-op. + } + + /** + * Process input. + * + * @param writer Writer. + */ + virtual void Write(binary::BinaryWriterImpl& writer) + { + if (!filter.IsNull()) + { + writer.WriteBool(true); + writer.WriteObject<FilterType>(*filter.Get()); + } + else + { + writer.WriteBool(false); + writer.WriteNull(); + } + } + + /** + * Get filter pointer. + * + * @return Filter. + */ + virtual CacheEntryEventFilterBase* GetFilter() + { + return filter.Get(); + } + + private: + /** Stored filter. */ + Reference<FilterType> filter; + }; + + template<> + class CacheEntryEventFilterHolder<void> : public CacheEntryEventFilterHolderBase + { + public: + /** + * Default constructor. + */ + CacheEntryEventFilterHolder() + { + // No-op. + } + + /** + * Constructor. + */ + CacheEntryEventFilterHolder(const Reference<void>&) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntryEventFilterHolder() + { + // No-op. + } + + /** + * Process input. + * + * @param writer Writer. + */ + virtual void Write(binary::BinaryWriterImpl& writer) + { + writer.WriteBool(false); + writer.WriteNull(); + } + + /** + * Get filter pointer. + * + * @return Filter. + */ + virtual CacheEntryEventFilterBase* GetFilter() + { + return 0; + } + }; + } + } + } +} + +#endif //_IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h index 75504b1..07facff 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h @@ -66,13 +66,6 @@ namespace ignite */ QueryCursorImpl* GetInitialQueryCursor(IgniteError& err); - /** - * Set query to keep pointer to. - * - * @param query Query. - */ - void SetQuery(SP_ContinuousQueryImplBase query); - private: /** Environment. */ SP_IgniteEnvironment env; @@ -83,9 +76,6 @@ namespace ignite /** Handle to Java object. */ jobject javaRef; - /** Shared pointer to query. Kept for query to live long enough. */ - SP_ContinuousQueryImplBase qry; - /** Mutex. */ common::concurrent::CriticalSection mutex; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h index 2a24e5f..d2bf241 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h @@ -24,11 +24,13 @@ #define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL #include <stdint.h> +#include <memory> #include <ignite/reference.h> #include <ignite/cache/event/cache_entry_event_listener.h> #include <ignite/binary/binary_raw_reader.h> +#include <ignite/impl/cache/event/cache_entry_event_filter_holder.h> namespace ignite { @@ -80,10 +82,11 @@ namespace ignite * * @param loc Whether query should be executed locally. */ - explicit ContinuousQueryImplBase(bool loc) : + explicit ContinuousQueryImplBase(bool loc, event::CacheEntryEventFilterHolderBase* filterOp) : local(loc), bufferSize(DEFAULT_BUFFER_SIZE), - timeInterval(DEFAULT_TIME_INTERVAL) + timeInterval(DEFAULT_TIME_INTERVAL), + filterOp(filterOp) { // No-op. } @@ -183,6 +186,16 @@ namespace ignite } /** + * Get remote filter holder. + * + * @return Filter holder. + */ + event::CacheEntryEventFilterHolderBase& GetFilterHolder() const + { + return *filterOp; + } + + /** * Callback that reads and processes cache events. * * @param reader Reader to use. @@ -221,6 +234,9 @@ namespace ignite * sent only when buffer is full. */ int64_t timeInterval; + + /** Cache entry event filter holder. */ + std::auto_ptr<event::CacheEntryEventFilterHolderBase> filterOp; }; /** @@ -252,11 +268,13 @@ namespace ignite /** * Constructor. * - * @param lsnr Event listener. Invoked on the node where + * @param lsnr Event listener Invoked on the node where * continuous query execution has been started. + * @param loc Whether query should be executed locally. */ - ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr) : - ContinuousQueryImplBase(false), + ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr, + bool loc) : + ContinuousQueryImplBase(loc, new event::CacheEntryEventFilterHolder<void>()), lsnr(lsnr) { // No-op. @@ -269,8 +287,10 @@ namespace ignite * continuous query execution has been started. * @param loc Whether query should be executed locally. */ - ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr, bool loc) : - ContinuousQueryImplBase(loc), + template<typename F> + ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr, + bool loc, const Reference<F>& filter) : + ContinuousQueryImplBase(loc, new event::CacheEntryEventFilterHolder<F>(filter)), lsnr(lsnr) { // No-op. @@ -335,13 +355,37 @@ namespace ignite for (int32_t i = 0; i < cnt; ++i) events[i].Read(reader); - lsnr.Get().OnEvent(events.data(), cnt); + lsnr.Get()->OnEvent(events.data(), cnt); } private: /** Cache entry event listener. */ Reference<ignite::cache::event::CacheEntryEventListener<K, V> > lsnr; }; + + /** + * Used to store filter on remote nodes where no + * ContinuousQuery instance were really created. + */ + class RemoteFilterHolder : public ContinuousQueryImplBase + { + public: + /** + * Constructor. + */ + template<typename F> + RemoteFilterHolder(const Reference<F>& filter): + ContinuousQueryImplBase(false, new event::CacheEntryEventFilterHolder<F>(filter)) + { + // No-op. + } + + virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader&) + { + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "No listener is registered for the ContinuousQuery instance"); + } + }; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h index 32de2cb..7b20c50 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h @@ -18,6 +18,7 @@ #ifndef _IGNITE_IMPL_IGNITE_BINDING_IMPL #define _IGNITE_IMPL_IGNITE_BINDING_IMPL +#include <stdint.h> #include <map> #include <ignite/common/common.h> @@ -29,6 +30,9 @@ namespace ignite { namespace impl { + /* Forward declaration. */ + class IgniteEnvironment; + /** * Ignite binding implementation. * @@ -36,16 +40,24 @@ namespace ignite */ class IgniteBindingImpl { - typedef void (Callback)(binary::BinaryReaderImpl&, binary::BinaryWriterImpl&); + typedef int64_t(Callback)(binary::BinaryReaderImpl&, binary::BinaryWriterImpl&, IgniteEnvironment&); public: + enum CallbackType + { + CACHE_ENTRY_PROCESSOR_APPLY = 1, + + CACHE_ENTRY_FILTER_CREATE = 2, + + CACHE_ENTRY_FILTER_APPLY = 3, + }; + /** - * Default constructor. + * Constructor. + * + * @param env Environment. */ - IgniteBindingImpl() : callbacks() - { - // No-op. - } + IgniteBindingImpl(IgniteEnvironment &env); /** * Invoke callback using provided ID. @@ -53,31 +65,15 @@ namespace ignite * Deserializes data and callback itself, invokes callback and * serializes processing result using providede reader and writer. * - * @param id Processor ID. + * @param type Callback Type. + * @param id Callback ID. * @param reader Reader. * @param writer Writer. - * @return True if callback is registered and false otherwise. + * @param found Output param. True if callback was found and false otherwise. + * @return Callback return value. */ - bool InvokeCallbackById(int64_t id, binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer) - { - common::concurrent::CsLockGuard guard(lock); - - std::map<int64_t, Callback*>::iterator it = callbacks.find(id); - - if (it != callbacks.end()) - { - Callback* callback = it->second; - - // We have found callback and does not need lock here anymore. - guard.Reset(); - - callback(reader, writer); - - return true; - } - - return false; - } + IGNITE_IMPORT_EXPORT int64_t InvokeCallback(bool& found, int32_t type, int32_t id, binary::BinaryReaderImpl& reader, + binary::BinaryWriterImpl& writer); /** * Register cache entry processor and associate it with provided ID. @@ -85,29 +81,42 @@ namespace ignite * @throw IgniteError another processor is already associated with * the given ID. * - * @param id Identifier for processor to be associated with. - * @param proc Callback. + * @param type Callback type. + * @param id Callback identifier. + * @param callback Callback. + * @param err Error. */ - void RegisterCallback(int64_t id, Callback* proc, IgniteError& err) - { - common::concurrent::CsLockGuard guard(lock); - - bool inserted = callbacks.insert(std::make_pair(id, proc)).second; - - guard.Reset(); - - if (!inserted) - { - std::stringstream builder; + IGNITE_IMPORT_EXPORT void RegisterCallback(int32_t type, int32_t id, Callback* callback, IgniteError& err); + + /** + * Register cache entry processor and associate it with provided ID. + * + * @throw IgniteError another processor is already associated with + * the given ID. + * + * @param type Callback type. + * @param id Callback identifier. + * @param callback Callback. + */ + IGNITE_IMPORT_EXPORT void RegisterCallback(int32_t type, int32_t id, Callback* callback); - builder << "Trying to register multiple PRC callbacks with the same ID. [id=" << id << ']'; + private: + IGNITE_NO_COPY_ASSIGNMENT(IgniteBindingImpl); - err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str()); - } + /** + * Make key out of callback's type and ID. + * + * @param type Callback Type. + * @param id Callback ID. + * @return Key for callback. + */ + int64_t makeKey(int32_t type, int32_t id) + { + return (static_cast<int64_t>(type) << 32) | id; } - private: - IGNITE_NO_COPY_ASSIGNMENT(IgniteBindingImpl); + /** Ignite environment. */ + IgniteEnvironment& env; /** Registered callbacks. */ std::map<int64_t, Callback*> callbacks; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h index 5fc9a27..e3cb859 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h @@ -21,19 +21,20 @@ #include <ignite/common/concurrent.h> #include <ignite/jni/java.h> #include <ignite/jni/utils.h> -#include <ignite/ignite_binding_context.h> #include <ignite/ignite_configuration.h> -#include "ignite/impl/interop/interop_memory.h" -#include "ignite/impl/binary/binary_type_manager.h" -#include "ignite/impl/handle_registry.h" -#include "ignite/impl/module_manager.h" -#include "ignite/impl/ignite_binding_impl.h" +#include <ignite/impl/interop/interop_memory.h> +#include <ignite/impl/binary/binary_type_manager.h> +#include <ignite/impl/handle_registry.h> namespace ignite { namespace impl { + /* Forward declarations. */ + class IgniteBindingImpl; + class ModuleManager; + /** * Defines environment in which Ignite operates. */ @@ -110,6 +111,21 @@ namespace ignite void OnContinuousQueryListenerApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem); /** + * Continuous query filter create callback. + * + * @param mem Memory with data. + * @return Filter handle. + */ + int64_t OnContinuousQueryFilterCreate(common::concurrent::SharedPointer<interop::InteropMemory>& mem); + + /** + * Continuous query filter apply callback. + * + * @param mem Memory with data. + */ + int64_t OnContinuousQueryFilterApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem); + + /** * Cache Invoke callback. * * @param mem Input-output memory. @@ -191,14 +207,7 @@ namespace ignite * * @return IgniteBinding instance. */ - IgniteBinding GetBinding() const; - - /** - * Get binding context. - * - * @return Binding context. - */ - IgniteBindingContext GetBindingContext() const; + common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding() const; private: /** Node configuration. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h index 24fc989..5b1f527 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h @@ -22,10 +22,10 @@ #include <ignite/jni/java.h> #include <ignite/common/utils.h> -#include "ignite/impl/cache/cache_impl.h" -#include "ignite/impl/transactions/transactions_impl.h" -#include "ignite/impl/cluster/cluster_group_impl.h" -#include "ignite/impl/ignite_environment.h" +#include <ignite/impl/cache/cache_impl.h> +#include <ignite/impl/transactions/transactions_impl.h> +#include <ignite/impl/cluster/cluster_group_impl.h> +#include <ignite/impl/ignite_environment.h> namespace ignite { @@ -154,7 +154,7 @@ namespace ignite * * @return IgniteBinding class instance. */ - IgniteBinding GetBinding(); + common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding(); /** * Get instance of the implementation from the proxy class. http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/operations.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h index dfaa4e8..fff8a86 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/operations.h +++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h @@ -79,7 +79,7 @@ namespace ignite } private: /** Value. */ - const T val; + const T& val; IGNITE_NO_COPY_ASSIGNMENT(In1Operation) }; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/project/vs/core.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj index b490887..b5a95bd 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj @@ -195,6 +195,7 @@ <ClInclude Include="..\..\include\ignite\cache\cache_entry_processor.h" /> <ClInclude Include="..\..\include\ignite\cache\cache_peek_mode.h" /> <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h" /> + <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_filter.h" /> <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h" /> <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h" /> <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h" /> @@ -212,9 +213,12 @@ <ClInclude Include="..\..\include\ignite\ignite_configuration.h" /> <ClInclude Include="..\..\include\ignite\ignition.h" /> <ClInclude Include="..\..\include\ignite\impl\binary\binary_type_updater_impl.h" /> + <ClInclude Include="..\..\include\ignite\impl\bindings.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\cache_entry_processor_holder.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\query_argument.h" /> + <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_base.h" /> + <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_holder.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h" /> @@ -246,6 +250,7 @@ <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" /> <ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" /> <ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp" /> + <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp" /> <ClCompile Include="..\..\src\impl\ignite_environment.cpp" /> <ClCompile Include="..\..\src\impl\ignite_impl.cpp" /> <ClCompile Include="..\..\src\impl\handle_registry.cpp" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/project/vs/core.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters index b75b3b2..3b17d53 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters @@ -52,6 +52,9 @@ <ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp"> <Filter>Code\impl\cluster</Filter> </ClCompile> + <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp"> + <Filter>Code\impl</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h"> @@ -192,6 +195,18 @@ <ClInclude Include="..\..\include\ignite\impl\cache\query\query_argument.h"> <Filter>Code\impl\cache\query</Filter> </ClInclude> + <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_filter.h"> + <Filter>Code\cache\event</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_base.h"> + <Filter>Code\impl\cache\event</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_holder.h"> + <Filter>Code\impl\cache\event</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\bindings.h"> + <Filter>Code\impl</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <Filter Include="Code"> @@ -236,5 +251,8 @@ <Filter Include="Code\impl\cluster"> <UniqueIdentifier>{f5b54635-91a1-447e-923a-1b4608d7e5bc}</UniqueIdentifier> </Filter> + <Filter Include="Code\impl\cache\event"> + <UniqueIdentifier>{9c5e9732-755a-4553-8926-b4cf3b6abaf3}</UniqueIdentifier> + </Filter> </ItemGroup> </Project> \ No newline at end of file
