http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientCallbackArgN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientCallbackArgN.cs b/clicache/integration-test/ThinClientCallbackArgN.cs new file mode 100644 index 0000000..8d8ff5b --- /dev/null +++ b/clicache/integration-test/ThinClientCallbackArgN.cs @@ -0,0 +1,726 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; + +namespace Apache.Geode.Client.UnitTests +{ + using NUnit.Framework; + using Apache.Geode.DUnitFramework; + using Apache.Geode.Client.Tests; + + using Apache.Geode.Client; + + using GIRegion = Apache.Geode.Client.IRegion<int, object>; + using System.Collections.Generic; + + public class CallbackListener : CacheListenerAdapter<int, object> + { + int m_creates; + int m_updates; + int m_invalidates; + int m_destroys; + int m_regionInvalidate; + int m_regionDestroy; + int m_regionClear; + object m_callbackArg; + + #region Getters + + public int Creates + { + get { return m_creates; } + } + + public int Updates + { + get { return m_updates; } + } + + public int Invalidates + { + get { return m_invalidates; } + } + + public int Destroys + { + get { return m_destroys; } + } + + public int RegionInvalidates + { + get { return m_regionInvalidate; } + } + + public int RegionDestroys + { + get { return m_regionDestroy; } + } + public int RegionClear + { + get { return m_regionClear; } + } + public object CallbackArg + { + get { return m_callbackArg; } + } + #endregion + + public void SetCallbackArg(object callbackArg) + { + m_callbackArg = callbackArg; + } + + private void check(object eventCallback, ref int updateCount) + { + Log.Fine("check.."); + if (eventCallback != null) + { + string callbackArg = eventCallback as string; + + if (callbackArg != null) + { + string cs = m_callbackArg as string; + if (cs != null) + { + if (callbackArg == cs) + { + Log.Fine("value matched"); + updateCount++; + } + else + Log.Fine("value matched NOT"); + } + } + else + { + Log.Fine("Callbackarg is not cacheable string"); + Portfolio pfCallback = eventCallback as Portfolio; + if (pfCallback != null) + { + Portfolio pf = m_callbackArg as Portfolio; + if (pf != null) + { + if (pf.Pkid == pfCallback.Pkid && pfCallback.ArrayNull == null + && pfCallback.ArrayZeroSize != null && pfCallback.ArrayZeroSize.Length == 0) + { + Log.Fine("value matched"); + updateCount++; + } + } + } + } + } + } + + private void checkCallbackArg(EntryEvent<int, object> entryEvent, ref int updateCount) + { + check(entryEvent.CallbackArgument, ref updateCount); + } + + private void checkCallbackArg(RegionEvent<int, object> regionEvent, ref int updateCount) + { + check(regionEvent.CallbackArgument, ref updateCount); + } + + #region CacheListener Members + + public override void AfterCreate(EntryEvent<int, object> ev) + { + checkCallbackArg(ev, ref m_creates); + } + + public override void AfterUpdate(EntryEvent<int, object> ev) + { + checkCallbackArg(ev, ref m_updates); + } + + public override void AfterInvalidate(EntryEvent<int, object> ev) + { + checkCallbackArg(ev, ref m_invalidates); + } + + public override void AfterDestroy(EntryEvent<int, object> ev) + { + checkCallbackArg(ev, ref m_destroys); + } + + public override void AfterRegionInvalidate(RegionEvent<int, object> rev) + { + checkCallbackArg(rev, ref m_regionInvalidate); + } + + public override void AfterRegionDestroy(RegionEvent<int, object> rev) + { + checkCallbackArg(rev, ref m_regionDestroy); + } + public override void AfterRegionClear(RegionEvent<int, object> rev) + { + checkCallbackArg(rev, ref m_regionClear); + } + + #endregion + } + + [TestFixture] + [Category("generics")] + public class ThinClientCallbackArg : ThinClientRegionSteps + { + private TallyWriter<int, object> m_writer; + private TallyListener<int, object> m_listener; + private CallbackListener m_callbackListener; + RegionOperation o_region; + private UnitProcess m_client1, m_client2; + int key0 = 12; + object m_callbackarg = "Gemstone's Callback"; + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2 }; + } + + public void CreateRegion(string locators, + bool caching, bool listener, bool writer) + { + if (listener) + { + m_listener = new TallyListener<int, object>(); + + } + else + { + m_listener = null; + } + GIRegion region = null; + region = CacheHelper.CreateTCRegion_Pool<int, object>(RegionName, true, caching, + m_listener, locators, "__TESTPOOL1_", true); + if (listener) + m_listener.SetCallBackArg(key0); + + if (writer) + { + m_writer = new TallyWriter<int, object>(); + + } + else + { + m_writer = null; + } + if (writer) + { + AttributesMutator<int, object> at = region.AttributesMutator; + at.SetCacheWriter(m_writer); + m_writer.SetCallBackArg(key0); + } + } + + public void CreateRegion2(string locators, + bool caching, bool listener, bool writer) + { + CallbackListener callbackLis = null; + if (listener) + { + m_callbackListener = new CallbackListener(); + m_callbackListener.SetCallbackArg(m_callbackarg); + callbackLis = m_callbackListener; + } + else + { + m_listener = null; + } + GIRegion region = null; + region = CacheHelper.CreateTCRegion_Pool<int, object>(RegionName, true, caching, + callbackLis, locators, "__TESTPOOL1_", true); + } + + public void ValidateLocalListenerWriterData() + { + Thread.Sleep(2000); + Assert.AreEqual(true, m_writer.IsWriterInvoked, "Writer should be invoked"); + Assert.AreEqual(true, m_listener.IsListenerInvoked, "Listener should be invoked"); + Assert.AreEqual(true, m_writer.IsCallBackArgCalled, "Writer CallbackArg should be invoked"); + Assert.AreEqual(true, m_listener.IsCallBackArgCalled, "Listener CallbackArg should be invoked"); + m_listener.ShowTallies(); + m_writer.ShowTallies(); + } + + public void ValidateEvents() + { + Assert.AreEqual(15, m_writer.Creates, "Should be 10 creates"); + Assert.AreEqual(15, m_listener.Creates, "Should be 10 creates"); + Assert.AreEqual(15, m_writer.Updates, "Should be 5 updates"); + Assert.AreEqual(15, m_listener.Updates, "Should be 5 updates"); + Assert.AreEqual(0, m_writer.Invalidates, "Should be 0 invalidates"); + Assert.AreEqual(5, m_listener.Invalidates, "Should be 5 invalidates"); + Assert.AreEqual(10, m_writer.Destroys, "Should be 10 destroys"); // 5 destroys + 5 removes + Assert.AreEqual(10, m_listener.Destroys, "Should be 10 destroys"); // 5 destroys + 5 removes + } + + public void CallOp() + { + o_region = new RegionOperation(RegionName); + o_region.PutOp(5, key0); + Thread.Sleep(1000); // let the events reach at other end. + o_region.PutOp(5, key0); + Thread.Sleep(1000); + o_region.InvalidateOp(5, key0); + Thread.Sleep(1000); + o_region.DestroyOp(5, key0); + Thread.Sleep(1000); // let the events reach at other end. + o_region.PutOp(5, key0); + Thread.Sleep(1000); + o_region.RemoveOp(5, key0); + Thread.Sleep(1000); + } + + void RegisterPdxType8() + { + Serializable.RegisterPdxType(PdxTests.PdxTypes8.CreateDeserializable); + } + + void runCallbackArgTest() + { + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer"); + m_client1.Call(CreateRegion, CacheHelper.Locators, + true, true, true); + m_client1.Call(RegisterAllKeys, new string[] { RegionName }); + + Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer"); + m_client2.Call(CreateRegion, CacheHelper.Locators, + true, true, true); + m_client2.Call(RegisterAllKeys, new string[] { RegionName }); + + m_client2.Call(RegisterPdxType8); + + m_client1.Call(CallOp); + + + m_client1.Call(ValidateLocalListenerWriterData); + m_client1.Call(ValidateEvents); + + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearLocators(); + CacheHelper.ClearEndpoints(); + } + + private bool m_isSet = false; + public void SetCallbackArg() + { + if (!m_isSet) + { + m_isSet = true; + m_callbackarg = new Portfolio(1, 1); + //TODO:;split + Serializable.RegisterTypeGeneric(Portfolio.CreateDeserializable, CacheHelper.DCache); + Serializable.RegisterTypeGeneric(Position.CreateDeserializable, CacheHelper.DCache); + } + } + + public void TestCreatesAndUpdates() + { + o_region = new RegionOperation(RegionName); + o_region.Region.Add("Key-1", "Val-1", m_callbackarg); + o_region.Region.Put("Key-1", "NewVal-1", m_callbackarg); + Thread.Sleep(10000); + } + + public void TestInvalidates() + { + o_region = new RegionOperation(RegionName); + o_region.Region.GetLocalView().Add(1234, 1234, m_callbackarg); + o_region.Region.GetLocalView().Add(12345, 12345, m_callbackarg); + o_region.Region.GetLocalView().Add(12346, 12346, m_callbackarg); + o_region.Region.GetLocalView().Put(1234, "Val-1", m_callbackarg); + o_region.Region.GetLocalView().Invalidate(1234, m_callbackarg); + Assert.AreEqual(o_region.Region.GetLocalView().Remove(12345, 12345, m_callbackarg), true, "Result of remove should be true, as this value exists locally."); + Assert.AreEqual(o_region.Region.GetLocalView().ContainsKey(12345), false, "containsKey should be false"); + Assert.AreEqual(o_region.Region.GetLocalView().Remove(12346, m_callbackarg), true, "Result of remove should be true, as this value exists locally."); + Assert.AreEqual(o_region.Region.GetLocalView().ContainsKey(12346), false, "containsKey should be false"); + o_region.Region.Invalidate("Key-1", m_callbackarg); + o_region.Region.InvalidateRegion(m_callbackarg); + } + + public void TestDestroy() + { + o_region = new RegionOperation(RegionName); + o_region.Region.Remove("Key-1", m_callbackarg); + //o_region.Region.DestroyRegion(m_callbackarg); + } + + public void TestRemove() + { + o_region = new RegionOperation(RegionName); + o_region.Region.Remove("Key-1", "NewVal-1", m_callbackarg); + o_region.Region.DestroyRegion(m_callbackarg); + } + + public void TestlocalClear() + { + o_region = new RegionOperation(RegionName); + o_region.Region.GetLocalView().Clear(m_callbackarg); + } + public void TestValidate() + { + Thread.Sleep(10000); + Assert.AreEqual(5, m_callbackListener.Creates, "Should be 5 creates"); + Assert.AreEqual(3, m_callbackListener.Updates, "Should be 3 update"); + Assert.AreEqual(2, m_callbackListener.Invalidates, "Should be 2 invalidate"); + Assert.AreEqual(4, m_callbackListener.Destroys, "Should be 4 destroy"); + Assert.AreEqual(1, m_callbackListener.RegionInvalidates, "Should be 1 region invalidates"); + Assert.AreEqual(1, m_callbackListener.RegionDestroys, "Should be 1 regiondestroy"); + Assert.AreEqual(1, m_callbackListener.RegionClear, "Should be 1 RegionClear"); + } + + void runCallbackArgTest2(int callbackArgChange) + { + if (callbackArgChange == 1) + { + //change now custom type + m_callbackarg = new Portfolio(1, 1); + m_client1.Call(SetCallbackArg); + m_client2.Call(SetCallbackArg); + } + + m_callbackListener = new CallbackListener(); + m_callbackListener.SetCallbackArg(m_callbackarg); + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription5N.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer"); + m_client1.Call(CreateRegion2, CacheHelper.Locators, + true, true, false); + m_client1.Call(RegisterAllKeys, new string[] { RegionName }); + + Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer"); + m_client2.Call(CreateRegion2, CacheHelper.Locators, + true, false, false); + + m_client2.Call(TestCreatesAndUpdates); + m_client1.Call(TestInvalidates); + m_client2.Call(TestDestroy); + m_client2.Call(TestCreatesAndUpdates); + m_client1.Call(TestlocalClear); + m_client2.Call(TestRemove); + m_client1.Call(TestValidate); + + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearLocators(); + CacheHelper.ClearEndpoints(); + } + + private bool isRegistered = false; + public void registerDefaultCacheableType() + { + if (!isRegistered) + { + Serializable.RegisterTypeGeneric(DefaultType.CreateDeserializable, CacheHelper.DCache); + isRegistered = true; + } + } + + + public void CallOp2() + { + o_region = new RegionOperation(RegionName); + DefaultType dc = new DefaultType(true); + o_region.Region.Put("key-1", dc, null); + Thread.Sleep(1000); // let the events reach at other end. + } + + public void ValidateData() + { + o_region = new RegionOperation(RegionName); + DefaultType dc = (DefaultType)o_region.Region.Get("key-1", null); + + Assert.AreEqual(dc.CBool, true, "bool is not equal"); + Assert.AreEqual(dc.CInt, 1000, "int is not equal"); + + int[] cia = dc.CIntArray; + Assert.IsNotNull(cia, "Int array is null"); + Assert.AreEqual(3, cia.Length, "Int array are not three"); + + string[] csa = dc.CStringArray; + Assert.IsNotNull(csa, "String array is null"); + Assert.AreEqual(2, csa.Length, "String array length is not two"); + + Assert.AreEqual(dc.CFileName, "geode.txt", "Cacheable filename is not equal"); + + /* + Assert.IsNotNull(dc.CHashSet, "hashset is null"); + Assert.AreEqual(2, dc.CHashSet.Count, "hashset size is not two"); + * */ + + Assert.IsNotNull(dc.CHashMap, "hashmap is null"); + Assert.AreEqual(1, dc.CHashMap.Count, "hashmap size is not one"); + + //Assert.IsNotNull(dc.CDate, "Date is null"); + + Assert.IsNotNull(dc.CVector); + Assert.AreEqual(2, dc.CVector.Count, "Vector size is not two"); + + //Assert.IsNotNull(dc.CObject); + //Assert.AreEqual("key", ((CustomSerializableObject)dc.CObject).key, "Object key is not same"); + //Assert.AreEqual("value", ((CustomSerializableObject)dc.CObject).value, "Object value is not same"); + } + + void runCallbackArgTest3() + { + + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription6.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer"); + m_client1.Call(CreateRegion, CacheHelper.Locators, + true, false, false); + // m_client1.Call(RegisterAllKeys, new string[] { RegionName }); + + Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer"); + m_client2.Call(CreateRegion, CacheHelper.Locators, + true, false, false); + + m_client1.Call(registerDefaultCacheableType); + m_client2.Call(registerDefaultCacheableType); + + m_client2.Call(RegisterAllKeys, new string[] { RegionName }); + + m_client1.Call(CallOp2); + + m_client2.Call(ValidateData); + + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearLocators(); + CacheHelper.ClearEndpoints(); + } + + public void TestRemoveAll() + { + o_region = new RegionOperation(RegionName); + ICollection<object> keys = new LinkedList<object>(); + for(int i =0; i< 10; i++) + { + o_region.Region["Key-"+i] = "Value-"+i; + keys.Add("Key-" + i); + } + o_region.Region.RemoveAll(keys, m_callbackarg); + } + + public void TestPutAll() + { + o_region = new RegionOperation(RegionName); + Dictionary<Object, Object> entryMap = new Dictionary<Object, Object>(); + for (Int32 item = 0; item < 10; item++) + { + int K = item; + string value = item.ToString(); + entryMap.Add(K, value); + } + o_region.Region.PutAll(entryMap, 15, m_callbackarg); + } + + public void TestGetAll() + { + o_region = new RegionOperation(RegionName); + List<Object> keys = new List<Object>(); + for (int item = 0; item < 10; item++) + { + Object K = item; + keys.Add(K); + } + Dictionary<Object, Object> values = new Dictionary<Object, Object>(); + o_region.Region.GetAll(keys.ToArray(), values, null, true, m_callbackarg); + + Dictionary<Object, Object>.Enumerator enumerator = values.GetEnumerator(); + while (enumerator.MoveNext()) + { + Util.Log("Values after getAll with CallBack Key = {0} Value = {1} ", enumerator.Current.Key.ToString(), enumerator.Current.Value.ToString()); + } + } + + public void TestValidateRemoveAllCallback() + { + Thread.Sleep(10000); + Assert.AreEqual(10, m_callbackListener.Destroys, "Should be 10 destroy"); + } + + public void TestValidatePutAllCallback() + { + Thread.Sleep(10000); + Assert.AreEqual(10, m_callbackListener.Creates, "Should be 10 creates"); + Assert.AreEqual("Gemstone's Callback", m_callbackListener.CallbackArg, "CallBackArg for putAll should be same"); + } + + void runPutAllCallbackArgTest() + { + m_callbackListener = new CallbackListener(); + m_callbackListener.SetCallbackArg(m_callbackarg); + + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription5N.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer"); + m_client1.Call(CreateRegion2, CacheHelper.Locators, + true, true, false); + m_client1.Call(RegisterAllKeys, new string[] { RegionName }); + Util.Log("RegisterAllKeys completed.."); + + Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer"); + m_client2.Call(CreateRegion2, CacheHelper.Locators, + true, false, false); + Util.Log("CreateRegion2 completed.."); + + m_client2.Call(TestPutAll); + Util.Log("TestPutAll completed.."); + m_client1.Call(TestValidatePutAllCallback); + Util.Log("TestValidatePutAllCallback completed.."); + m_client2.Call(TestGetAll); + Util.Log("TestGetAll completed.."); + + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearLocators(); + CacheHelper.ClearEndpoints(); + } + + void runRemoveAllCallbackArgTest() + { + + m_callbackListener = new CallbackListener(); + m_callbackListener.SetCallbackArg(m_callbackarg); + CacheHelper.SetupJavaServers(true, "cacheserver_notify_subscription5N.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + Util.Log("Creating region in client1, no-ack, cache-enabled, with listener and writer"); + m_client1.Call(CreateRegion2, CacheHelper.Locators, + true, true, false); + m_client1.Call(RegisterAllKeys, new string[] { RegionName }); + Util.Log("RegisterAllKeys completed.."); + + Util.Log("Creating region in client2 , no-ack, cache-enabled, with listener and writer"); + m_client2.Call(CreateRegion2,CacheHelper.Locators, + true, false, false); + Util.Log("CreateRegion2 completed.."); + + m_client2.Call(TestRemoveAll); + Util.Log("TestRemoveAll completed.."); + m_client1.Call(TestValidateRemoveAllCallback); + Util.Log("TestValidateRemoveAllCallback completed.."); + + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearLocators(); + CacheHelper.ClearEndpoints(); + } + + [TearDown] + public override void EndTest() + { + base.EndTest(); + } + + [Test] + public void ThinClientCallbackArgTest() + { + runCallbackArgTest(); + } + + [Test] + public void ThinClientCallbackArgTest2() + { + for (int i = 0; i < 2; i++) + { + runCallbackArgTest2(i); + } + } + + [Test] + public void ThinClientCallbackArgTest3() + { + runCallbackArgTest3(); + } + + [Test] + public void RemoveAllCallbackArgTest() + { + runRemoveAllCallbackArgTest(); + } + + [Test] + public void PutAllCallbackArgTest() + { + runPutAllCallbackArgTest(); + } + } +}
http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientConflationTestsN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientConflationTestsN.cs b/clicache/integration-test/ThinClientConflationTestsN.cs new file mode 100644 index 0000000..0672520 --- /dev/null +++ b/clicache/integration-test/ThinClientConflationTestsN.cs @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Apache.Geode.Client.UnitTests +{ + using NUnit.Framework; + using Apache.Geode.DUnitFramework; + using Apache.Geode.Client.Tests; + using Apache.Geode.Client; + + #region Listener + class ConflationListner<TKey, TValue> : ICacheListener<TKey, TValue> + { + #region Private members + + private int m_events = 0; + private TValue m_value = default(TValue); + #endregion + + public static ConflationListner<TKey, TValue> Create() + { + Util.Log(" ConflationListner Created"); + + return new ConflationListner<TKey, TValue>(); + } + + private void check(EntryEvent<TKey, TValue> ev) + { + m_events++; + TKey key = ev.Key; + TValue value = ev.NewValue; + m_value = value; + Util.Log("Region:{0}:: Key:{1}, Value:{2}", ev.Region.Name, key, value); + + } + + public void validate(bool conflation) + { + if (conflation) + { + string msg1 = string.Format("Conflation On: Expected 2 events but got {0}", m_events); + Assert.AreEqual(2, m_events, msg1); + } + else + { + string msg2 = string.Format("Conflation Off: Expected 5 events but got {0}", m_events); + Assert.AreEqual(5, m_events, msg2); + } + + string msg3 = string.Format("Expected Value =5, Actual = {0}", m_value); + Assert.AreEqual(5, m_value, msg3); + } + + #region ICacheListener Members + + public virtual void AfterCreate(EntryEvent<TKey, TValue> ev) + { + check(ev); + } + + public virtual void AfterUpdate(EntryEvent<TKey, TValue> ev) + { + check(ev); + } + + public virtual void AfterDestroy(EntryEvent<TKey, TValue> ev) { } + + public virtual void AfterInvalidate(EntryEvent<TKey, TValue> ev) { } + + public virtual void AfterRegionDestroy(RegionEvent<TKey, TValue> ev) { } + + public virtual void AfterRegionInvalidate(RegionEvent<TKey, TValue> ev) { } + + public virtual void AfterRegionClear(RegionEvent<TKey, TValue> ev) { } + + public virtual void AfterRegionLive(RegionEvent<TKey, TValue> ev) + { + Util.Log("DurableListener: Received AfterRegionLive event of region: {0}", ev.Region.Name); + } + + public virtual void Close(IRegion<TKey, TValue> region) { } + public virtual void AfterRegionDisconnected(IRegion<TKey, TValue> region) { } + + #endregion + } + #endregion + + [TestFixture] + [Category("group1")] + [Category("unicast_only")] + [Category("generics")] + + public class ThinClientConflationTests : ThinClientRegionSteps + { + #region Private members + + private UnitProcess m_client1, m_client2, m_feeder; + private string[] keys = { "Key-1", "Key-2", "Key-3", "Key-4", "Key-5" }; + + private static string[] DurableClientIds = { "DurableClientId1", "DurableClientId2" }; + + static string[] Regions = { "ConflatedRegion", "NonConflatedRegion" }; + + private static ConflationListner<object, object> m_listener1C1, m_listener2C1, m_listener1C2, m_listener2C2; + + #endregion + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + m_feeder = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2, m_feeder }; + } + + [TearDown] + public override void EndTest() + { + try + { + m_client1.Call(CacheHelper.Close); + m_client2.Call(CacheHelper.Close); + m_feeder.Call(CacheHelper.Close); + CacheHelper.ClearEndpoints(); + } + finally + { + CacheHelper.StopJavaServers(); + } + base.EndTest(); + } + + #region Common Functions + + public void InitFeeder(string locators, int redundancyLevel) + { + CacheHelper.CreatePool<object, object>("__TESTPOOL1_", locators, (string)null, redundancyLevel, false); + CacheHelper.CreateTCRegion_Pool<object, object>(Regions[0], false, false, null, + CacheHelper.Locators, "__TESTPOOL1_", false); + CacheHelper.CreateTCRegion_Pool<object, object>(Regions[1], false, false, null, + CacheHelper.Locators, "__TESTPOOL1_", false); + } + + public void InitDurableClient(int client, string locators, string conflation) + { + // Create DurableListener for first time and use same afterward. + ConflationListner<object, object> checker1 = null; + ConflationListner<object, object> checker2 = null; + string durableId = ThinClientConflationTests.DurableClientIds[client - 1]; + if (client == 1) + { + ThinClientConflationTests.m_listener1C1 = ConflationListner<object, object>.Create(); + ThinClientConflationTests.m_listener2C1 = ConflationListner<object, object>.Create(); + checker1 = ThinClientConflationTests.m_listener1C1; + checker2 = ThinClientConflationTests.m_listener2C1; + } + else // client == 2 + { + ThinClientConflationTests.m_listener1C2 = ConflationListner<object, object>.Create(); + ThinClientConflationTests.m_listener2C2 = ConflationListner<object, object>.Create(); + checker1 = ThinClientConflationTests.m_listener1C2; + checker2 = ThinClientConflationTests.m_listener2C2; + } + CacheHelper.InitConfigForConflation_Pool(locators, durableId, conflation); + CacheHelper.CreateTCRegion_Pool<object, object>(Regions[0], false, true, checker1, + CacheHelper.Locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(Regions[1], false, true, checker2, + CacheHelper.Locators, "__TESTPOOL1_", true); + + //CacheHelper.DCache.ReadyForEvents(); + + IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(Regions[0]); + region1.GetSubscriptionService().RegisterAllKeys(true); + IRegion<object, object> region2 = CacheHelper.GetVerifyRegion<object, object>(Regions[1]); + region2.GetSubscriptionService().RegisterAllKeys(true); + } + + public void ReadyForEvents() + { + CacheHelper.DCache.ReadyForEvents(); + } + + public void FeederUpdate(int keyIdx) + { + IRegion<object, object> region1 = CacheHelper.GetVerifyRegion<object, object>(Regions[0]); + + + region1[keys[keyIdx]] = 1; + region1[keys[keyIdx]] = 2; + region1[keys[keyIdx]] = 3; + region1[keys[keyIdx]] = 4; + region1[keys[keyIdx]] = 5; + + IRegion<object, object> region2 = CacheHelper.GetVerifyRegion<object, object>(Regions[1]); + + region2[keys[keyIdx]] = 1; + region2[keys[keyIdx]] = 2; + region2[keys[keyIdx]] = 3; + region2[keys[keyIdx]] = 4; + region2[keys[keyIdx]] = 5; + } + + public void ClientDown() + { + CacheHelper.Close(); + } + + + public void KillServer() + { + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + } + + public delegate void KillServerDelegate(); + + #endregion + + + public void Validate(int client, int region, bool conflate) + { + ConflationListner<object, object> checker = null; + if (client == 1) + { + if (region == 1) + checker = ThinClientConflationTests.m_listener1C1; + else + checker = ThinClientConflationTests.m_listener2C1; + } + else // client == 2 + { + if (region == 1) + checker = ThinClientConflationTests.m_listener1C2; + else + checker = ThinClientConflationTests.m_listener2C2; + } + + if (checker != null) + { + checker.validate(conflate); + } + else + { + Assert.Fail("Checker is NULL!"); + } + } + + void runConflationBasic() + { + CacheHelper.SetupJavaServers(true, "cacheserver_conflation.xml"); + + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_feeder.Call(InitFeeder, CacheHelper.Locators, 0); + Util.Log("Feeder initialized."); + + //Test "true" and "false" settings + m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, "true"); + m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, "false"); + Util.Log("Clients initialized for first time."); + + m_feeder.Call(FeederUpdate, 0); + Util.Log("Feeder performed first update."); + + Util.Log("Client1 sending readyForEvents()."); + m_client1.Call(ReadyForEvents); + Thread.Sleep(5000); + Util.Log("Validating Client 1 Region 1."); + m_client1.Call(Validate, 1, 1, true); + Util.Log("Validating Client 1 Region 2."); + m_client1.Call(Validate, 1, 2, true); + + Util.Log("Client2 sending readyForEvents()."); + m_client2.Call(ReadyForEvents); + Thread.Sleep(5000); + Util.Log("Validating Client 2 Region 1."); + m_client2.Call(Validate, 2, 1, false); + Util.Log("Validating Client 2 Region 1."); + m_client2.Call(Validate, 2, 2, false); + + //Close Clients. + m_client1.Call(ClientDown); + m_client2.Call(ClientDown); + Util.Log("First step complete, tested true/false options."); + + //Test "server" and not set settings + m_client1.Call(InitDurableClient, 1, CacheHelper.Locators, "server"); + m_client2.Call(InitDurableClient, 2, CacheHelper.Locators, ""); + Util.Log("Clients initialized second times."); + + m_feeder.Call(FeederUpdate, 1); + Util.Log("Feeder performed second update."); + + Util.Log("Client1 sending readyForEvents()."); + m_client1.Call(ReadyForEvents); + Thread.Sleep(5000); + Util.Log("Validating Client 1 Region 1."); + m_client1.Call(Validate, 1, 1, true); + Util.Log("Validating Client 1 Region 2."); + m_client1.Call(Validate, 1, 2, false); + + Util.Log("Client2 sending readyForEvents()."); + m_client2.Call(ReadyForEvents); + Thread.Sleep(5000); + Util.Log("Validating Client 2 Region 1."); + m_client2.Call(Validate, 2, 1, true); + Util.Log("Validating Client 2 Region 2."); + m_client2.Call(Validate, 2, 2, false); + + //Close Clients. + m_client1.Call(ClientDown); + m_client2.Call(ClientDown); + m_feeder.Call(ClientDown); + Util.Log("Feeder and Clients closed."); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + + CacheHelper.ClearLocators(); + CacheHelper.ClearEndpoints(); + } + + [Test] + public void ConflationBasic() + { + runConflationBasic(); + } + + } +} http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientCqIRTestsN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientCqIRTestsN.cs b/clicache/integration-test/ThinClientCqIRTestsN.cs new file mode 100644 index 0000000..ea90850 --- /dev/null +++ b/clicache/integration-test/ThinClientCqIRTestsN.cs @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Apache.Geode.Client.UnitTests +{ + using NUnit.Framework; + using Apache.Geode.DUnitFramework; + using Apache.Geode.Client.Tests; + using Apache.Geode.Client; + + + [TestFixture] + [Category("group2")] + [Category("unicast_only")] + [Category("generics")] + + public class ThinClientCqIRTests : ThinClientRegionSteps + { + #region Private members + + private UnitProcess m_client1; + private UnitProcess m_client2; + private static string[] QueryRegionNames = { "Portfolios", "Positions", "Portfolios2", + "Portfolios3" }; + private static string QERegionName = "Portfolios"; + private static string CqName = "MyCq"; + + #endregion + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2 }; + } + + [TestFixtureSetUp] + public override void InitTests() + { + base.InitTests(); + m_client1.Call(InitClient); + m_client2.Call(InitClient); + } + + [TearDown] + public override void EndTest() + { + CacheHelper.StopJavaServers(); + base.EndTest(); + } + + + public void InitClient() + { + CacheHelper.Init(); + try + { + Serializable.RegisterTypeGeneric(Portfolio.CreateDeserializable, CacheHelper.DCache); + Serializable.RegisterTypeGeneric(Position.CreateDeserializable, CacheHelper.DCache); + } + catch (IllegalStateException) + { + // ignore since we run multiple iterations for pool and non pool configs + } + } + public void StepOne(string locators) + { + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[0], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[1], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[2], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[3], true, true, + null, locators, "__TESTPOOL1_", true); + + IRegion<object, object> region = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); + Apache.Geode.Client.RegionAttributes<object, object> regattrs = region.Attributes; + region.CreateSubRegion(QueryRegionNames[1], regattrs); + } + + public void StepTwo() + { + IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); + IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); + IRegion<object, object> region1 = CacheHelper.GetRegion<object, object>(QueryRegionNames[1]); + IRegion<object, object> region2 = CacheHelper.GetRegion<object, object>(QueryRegionNames[2]); + IRegion<object, object> region3 = CacheHelper.GetRegion<object, object>(QueryRegionNames[3]); + + QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); + Util.Log("SetSize {0}, NumSets {1}.", qh.PortfolioSetSize, + qh.PortfolioNumSets); + + qh.PopulatePortfolioData(region0, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePositionData(subRegion0, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePositionData(region1, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioData(region2, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioData(region3, qh.PortfolioSetSize, + qh.PortfolioNumSets); + } + + public void StepTwoQT() + { + IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); + IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); + + QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); + + qh.PopulatePortfolioData(region0, 100, 20, 100); + qh.PopulatePositionData(subRegion0, 100, 20); + } + + public void StepOneQE(string locators) + { + CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, + null, locators, "__TESTPOOL1_", true); + + IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); + Portfolio p1 = new Portfolio(1, 100); + Portfolio p2 = new Portfolio(2, 100); + Portfolio p3 = new Portfolio(3, 100); + Portfolio p4 = new Portfolio(4, 100); + + region["1"] = p1; + region["2"] = p2; + region["3"] = p3; + region["4"] = p4; + + QueryService<object, object> qs = null; + qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>(); + CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); + ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); + cqFac.AddCqListener(cqLstner); + CqAttributes<object, object> cqAttr = cqFac.Create(); + CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); + ICqResults<object> results = qry.ExecuteWithInitialResults(); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + region["4"] = p1; + region["3"] = p2; + region["2"] = p3; + region["1"] = p4; + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + Util.Log("Results size {0}.", results.Size); + + SelectResultsIterator<object> iter = results.GetIterator(); + + while (iter.HasNext) + { + object item = iter.Next(); + if (item != null) + { + Struct st = item as Struct; + + string key = st["key"] as string; + + Assert.IsNotNull(key, "key is null"); + + Portfolio port = st["value"] as Portfolio; + + if (port == null) + { + Position pos = st["value"] as Position; + if (pos == null) + { + string cs = item as string; + + if (cs == null) + { + Assert.Fail("value is null"); + Util.Log("Query got other/unknown object."); + } + else + { + Util.Log("Query got string : {0}.", cs); + } + } + else + { + Util.Log("Query got Position object with secId {0}, shares {1}.", pos.SecId, pos.SharesOutstanding); + } + } + else + { + Util.Log("Query got Portfolio object with ID {0}, pkid {1}.", port.ID, port.Pkid); + } + } + } + qry = qs.GetCq(CqName); + qry.Stop(); + qry.Close(); + // Bring down the region + region.GetLocalView().DestroyRegion(); + } + + void runCqQueryIRTest() + { + CacheHelper.SetupJavaServers(true, "remotequeryN.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_client1.Call(StepOne, CacheHelper.Locators); + Util.Log("StepOne complete."); + + m_client1.Call(StepTwo); + Util.Log("StepTwo complete."); + + m_client1.Call(StepOneQE, CacheHelper.Locators); + Util.Log("StepOne complete."); + + m_client1.Call(Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + } + + [Test] + public void CqQueryIRTest() + { + runCqQueryIRTest(); + } + + } +} http://git-wip-us.apache.org/repos/asf/geode-native/blob/6cbd424f/clicache/integration-test/ThinClientCqTestsN.cs ---------------------------------------------------------------------- diff --git a/clicache/integration-test/ThinClientCqTestsN.cs b/clicache/integration-test/ThinClientCqTestsN.cs new file mode 100644 index 0000000..2dd8118 --- /dev/null +++ b/clicache/integration-test/ThinClientCqTestsN.cs @@ -0,0 +1,1025 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Apache.Geode.Client.UnitTests +{ + using NUnit.Framework; + using Apache.Geode.DUnitFramework; + using Apache.Geode.Client.Tests; + using Apache.Geode.Client; + + public class MyCqListener<TKey, TResult> : ICqListener<TKey, TResult> + { + #region Private members + private bool m_failedOver = false; + private UInt32 m_eventCountBefore = 0; + private UInt32 m_errorCountBefore = 0; + private UInt32 m_eventCountAfter = 0; + private UInt32 m_errorCountAfter = 0; + + #endregion + + #region Public accessors + + public void failedOver() + { + m_failedOver = true; + } + public UInt32 getEventCountBefore() + { + return m_eventCountBefore; + } + public UInt32 getErrorCountBefore() + { + return m_errorCountBefore; + } + public UInt32 getEventCountAfter() + { + return m_eventCountAfter; + } + public UInt32 getErrorCountAfter() + { + return m_errorCountAfter; + } + #endregion + + public virtual void OnEvent(CqEvent<TKey, TResult> ev) + { + Util.Log("MyCqListener::OnEvent called"); + if (m_failedOver == true) + m_eventCountAfter++; + else + m_eventCountBefore++; + + //IGeodeSerializable val = ev.getNewValue(); + //ICacheableKey key = ev.getKey(); + + TResult val = (TResult)ev.getNewValue(); + /*ICacheableKey*/ + TKey key = ev.getKey(); + + CqOperationType opType = ev.getQueryOperation(); + //CacheableString keyS = key as CacheableString; + string keyS = key.ToString(); //as string; + Portfolio pval = val as Portfolio; + PortfolioPdx pPdxVal = val as PortfolioPdx; + Assert.IsTrue((pPdxVal != null) || (pval != null)); + //string opStr = "DESTROY"; + /*if (opType == CqOperationType.OP_TYPE_CREATE) + opStr = "CREATE"; + else if (opType == CqOperationType.OP_TYPE_UPDATE) + opStr = "UPDATE";*/ + + //Util.Log("key {0}, value ({1},{2}), op {3}.", keyS, + // pval.ID, pval.Pkid, opStr); + } + public virtual void OnError(CqEvent<TKey, TResult> ev) + { + Util.Log("MyCqListener::OnError called"); + if (m_failedOver == true) + m_errorCountAfter++; + else + m_errorCountBefore++; + } + public virtual void Close() + { + Util.Log("MyCqListener::close called"); + } + public virtual void Clear() + { + Util.Log("MyCqListener::Clear called"); + m_eventCountBefore = 0; + m_errorCountBefore = 0; + m_eventCountAfter = 0; + m_errorCountAfter = 0; + } + } + + public class MyCqListener1<TKey, TResult> : ICqListener<TKey, TResult> + { + public static UInt32 m_cntEvents = 0; + + public virtual void OnEvent(CqEvent<TKey, TResult> ev) + { + m_cntEvents++; + Util.Log("MyCqListener1::OnEvent called"); + Object val = (Object)ev.getNewValue(); + Object pkey = (Object)ev.getKey(); + int value = (int)val; + int key = (int)pkey; + CqOperationType opType = ev.getQueryOperation(); + String opStr = "Default"; + if (opType == CqOperationType.OP_TYPE_CREATE) + opStr = "CREATE"; + else if (opType == CqOperationType.OP_TYPE_UPDATE) + opStr = "UPDATE"; + + Util.Log("MyCqListener1::OnEvent called with {0} , key = {1}, value = {2} ", + opStr, key, value); + } + public virtual void OnError(CqEvent<TKey, TResult> ev) + { + Util.Log("MyCqListener1::OnError called"); + } + public virtual void Close() + { + Util.Log("MyCqListener1::close called"); + } + } + + + + public class MyCqStatusListener<TKey, TResult> : ICqStatusListener<TKey, TResult> + { + #region Private members + private bool m_failedOver = false; + private UInt32 m_eventCountBefore = 0; + private UInt32 m_errorCountBefore = 0; + private UInt32 m_eventCountAfter = 0; + private UInt32 m_errorCountAfter = 0; + private UInt32 m_CqConnectedCount = 0; + private UInt32 m_CqDisConnectedCount = 0; + + #endregion + + #region Public accessors + + public MyCqStatusListener(int id) + { + } + + public void failedOver() + { + m_failedOver = true; + } + public UInt32 getEventCountBefore() + { + return m_eventCountBefore; + } + public UInt32 getErrorCountBefore() + { + return m_errorCountBefore; + } + public UInt32 getEventCountAfter() + { + return m_eventCountAfter; + } + public UInt32 getErrorCountAfter() + { + return m_errorCountAfter; + } + public UInt32 getCqConnectedCount() + { + return m_CqConnectedCount; + } + public UInt32 getCqDisConnectedCount() + { + return m_CqDisConnectedCount; + } + #endregion + + public virtual void OnEvent(CqEvent<TKey, TResult> ev) + { + Util.Log("MyCqStatusListener::OnEvent called"); + if (m_failedOver == true) + m_eventCountAfter++; + else + m_eventCountBefore++; + + TResult val = (TResult)ev.getNewValue(); + TKey key = ev.getKey(); + + CqOperationType opType = ev.getQueryOperation(); + string keyS = key.ToString(); //as string; + } + public virtual void OnError(CqEvent<TKey, TResult> ev) + { + Util.Log("MyCqStatusListener::OnError called"); + if (m_failedOver == true) + m_errorCountAfter++; + else + m_errorCountBefore++; + } + public virtual void Close() + { + Util.Log("MyCqStatusListener::close called"); + } + public virtual void OnCqConnected() + { + m_CqConnectedCount++; + Util.Log("MyCqStatusListener::OnCqConnected called"); + } + public virtual void OnCqDisconnected() + { + m_CqDisConnectedCount++; + Util.Log("MyCqStatusListener::OnCqDisconnected called"); + } + + public virtual void Clear() + { + Util.Log("MyCqStatusListener::Clear called"); + m_eventCountBefore = 0; + m_errorCountBefore = 0; + m_eventCountAfter = 0; + m_errorCountAfter = 0; + m_CqConnectedCount = 0; + m_CqDisConnectedCount = 0; + } + } + + [TestFixture] + [Category("group3")] + [Category("unicast_only")] + [Category("generics")] + + public class ThinClientCqTests : ThinClientRegionSteps + { + #region Private members + private static bool m_usePdxObjects = false; + private UnitProcess m_client1; + private UnitProcess m_client2; + private static string[] QueryRegionNames = { "Portfolios", "Positions", "Portfolios2", + "Portfolios3" }; + private static string QERegionName = "Portfolios"; + private static string CqName = "MyCq"; + + private static string CqName1 = "testCQAllServersLeave"; + private static string CqName2 = "testCQAllServersLeave1"; + + private static string CqQuery1 = "select * from /DistRegionAck"; + private static string CqQuery2 = "select * from /DistRegionAck1"; + //private static string CqName1 = "MyCq1"; + + #endregion + + protected override ClientBase[] GetClients() + { + m_client1 = new UnitProcess(); + m_client2 = new UnitProcess(); + return new ClientBase[] { m_client1, m_client2 }; + } + + [TestFixtureSetUp] + public override void InitTests() + { + base.InitTests(); + m_client1.Call(InitClient); + m_client2.Call(InitClient); + } + + [TearDown] + public override void EndTest() + { + CacheHelper.StopJavaServers(); + base.EndTest(); + } + + + public void InitClient() + { + CacheHelper.Init(); + try + { + Serializable.RegisterTypeGeneric(Portfolio.CreateDeserializable, CacheHelper.DCache); + Serializable.RegisterTypeGeneric(Position.CreateDeserializable, CacheHelper.DCache); + } + catch (IllegalStateException) + { + // ignore since we run multiple iterations for pool and non pool configs + } + } + public void StepOne(string locators) + { + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[0], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[1], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[2], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>(QueryRegionNames[3], true, true, + null, locators, "__TESTPOOL1_", true); + CacheHelper.CreateTCRegion_Pool<object, object>("DistRegionAck", true, true, + null, locators, "__TESTPOOL1_", true); + IRegion<object, object> region = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); + Apache.Geode.Client.RegionAttributes<object, object> regattrs = region.Attributes; + region.CreateSubRegion(QueryRegionNames[1], regattrs); + } + + public void StepTwo(bool usePdxObject) + { + IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); + IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); + IRegion<object, object> region1 = CacheHelper.GetRegion<object, object>(QueryRegionNames[1]); + IRegion<object, object> region2 = CacheHelper.GetRegion<object, object>(QueryRegionNames[2]); + IRegion<object, object> region3 = CacheHelper.GetRegion<object, object>(QueryRegionNames[3]); + + QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); + Util.Log("Object type is pdx = " + m_usePdxObjects); + + Util.Log("SetSize {0}, NumSets {1}.", qh.PortfolioSetSize, + qh.PortfolioNumSets); + + if (!usePdxObject) + { + qh.PopulatePortfolioData(region0, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePositionData(subRegion0, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePositionData(region1, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioData(region2, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioData(region3, qh.PortfolioSetSize, + qh.PortfolioNumSets); + } + else + { + Serializable.RegisterPdxType(PortfolioPdx.CreateDeserializable); + Serializable.RegisterPdxType(PositionPdx.CreateDeserializable); + qh.PopulatePortfolioPdxData(region0, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioPdxData(subRegion0, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioPdxData(region1, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioPdxData(region2, qh.PortfolioSetSize, + qh.PortfolioNumSets); + qh.PopulatePortfolioPdxData(region3, qh.PortfolioSetSize, + qh.PortfolioNumSets); + } + } + + public void StepTwoQT() + { + IRegion<object, object> region0 = CacheHelper.GetRegion<object, object>(QueryRegionNames[0]); + IRegion<object, object> subRegion0 = region0.GetSubRegion(QueryRegionNames[1]); + + QueryHelper<object, object> qh = QueryHelper<object, object>.GetHelper(CacheHelper.DCache); + + qh.PopulatePortfolioData(region0, 100, 20, 100); + qh.PopulatePositionData(subRegion0, 100, 20); + } + + public void StepOneQE(string locators) + { + CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, + null, locators, "__TESTPOOL1_", true); + IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); + Portfolio p1 = new Portfolio(1, 100); + Portfolio p2 = new Portfolio(2, 100); + Portfolio p3 = new Portfolio(3, 100); + Portfolio p4 = new Portfolio(4, 100); + + region["1"] = p1; + region["2"] = p2; + region["3"] = p3; + region["4"] = p4; + + QueryService<object, object> qs = null; + + qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>(); + CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); + ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); + cqFac.AddCqListener(cqLstner); + CqAttributes<object, object> cqAttr = cqFac.Create(); + CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); + qry.Execute(); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + region["4"] = p1; + region["3"] = p2; + region["2"] = p3; + region["1"] = p4; + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + + qry = qs.GetCq(CqName); + + CqServiceStatistics cqSvcStats = qs.GetCqStatistics(); + Assert.AreEqual(1, cqSvcStats.numCqsActive()); + Assert.AreEqual(1, cqSvcStats.numCqsCreated()); + Assert.AreEqual(1, cqSvcStats.numCqsOnClient()); + + cqAttr = qry.GetCqAttributes(); + ICqListener<object, object>[] vl = cqAttr.getCqListeners(); + Assert.IsNotNull(vl); + Assert.AreEqual(1, vl.Length); + cqLstner = vl[0]; + Assert.IsNotNull(cqLstner); + MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; + Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); + + CqStatistics cqStats = qry.GetStatistics(); + Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore()); + if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0) + { + Assert.Fail("cq before count zero"); + } + qry.Stop(); + Assert.AreEqual(1, cqSvcStats.numCqsStopped()); + qry.Close(); + Assert.AreEqual(1, cqSvcStats.numCqsClosed()); + // Bring down the region + region.GetLocalView().DestroyRegion(); + } + + public void StepOnePdxQE(string locators) + { + CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, + null, locators, "__TESTPOOL1_", true); + IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); + PortfolioPdx p1 = new PortfolioPdx(1, 100); + PortfolioPdx p2 = new PortfolioPdx(2, 100); + PortfolioPdx p3 = new PortfolioPdx(3, 100); + PortfolioPdx p4 = new PortfolioPdx(4, 100); + + region["1"] = p1; + region["2"] = p2; + region["3"] = p3; + region["4"] = p4; + + QueryService<object, object> qs = null; + + qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>(); + CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); + ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); + cqFac.AddCqListener(cqLstner); + CqAttributes<object, object> cqAttr = cqFac.Create(); + CqQuery<object, object> qry = qs.NewCq(CqName, "select * from /" + QERegionName + " p where p.ID!=2", cqAttr, false); + qry.Execute(); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + region["4"] = p1; + region["3"] = p2; + region["2"] = p3; + region["1"] = p4; + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + + qry = qs.GetCq(CqName); + + CqServiceStatistics cqSvcStats = qs.GetCqStatistics(); + Assert.AreEqual(1, cqSvcStats.numCqsActive()); + Assert.AreEqual(1, cqSvcStats.numCqsCreated()); + Assert.AreEqual(1, cqSvcStats.numCqsOnClient()); + + cqAttr = qry.GetCqAttributes(); + ICqListener<object, object>[] vl = cqAttr.getCqListeners(); + Assert.IsNotNull(vl); + Assert.AreEqual(1, vl.Length); + cqLstner = vl[0]; + Assert.IsNotNull(cqLstner); + MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; + Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); + + CqStatistics cqStats = qry.GetStatistics(); + Assert.AreEqual(cqStats.numEvents(), myLisner.getEventCountBefore()); + if (myLisner.getEventCountBefore() + myLisner.getErrorCountBefore() == 0) + { + Assert.Fail("cq before count zero"); + } + qry.Stop(); + Assert.AreEqual(1, cqSvcStats.numCqsStopped()); + qry.Close(); + Assert.AreEqual(1, cqSvcStats.numCqsClosed()); + // Bring down the region + region.GetLocalView().DestroyRegion(); + } + public void KillServer() + { + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + } + + public delegate void KillServerDelegate(); + + /* + public void StepOneFailover() + { + // This is here so that Client1 registers information of the cacheserver + // that has been already started + CacheHelper.SetupJavaServers("remotequery.xml", + "cqqueryfailover.xml"); + CacheHelper.StartJavaServer(1, "GFECS1"); + Util.Log("Cacheserver 1 started."); + + CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true); + + Region region = CacheHelper.GetVerifyRegion(QueryRegionNames[0]); + Portfolio p1 = new Portfolio(1, 100); + Portfolio p2 = new Portfolio(2, 200); + Portfolio p3 = new Portfolio(3, 300); + Portfolio p4 = new Portfolio(4, 400); + + region.Put("1", p1); + region.Put("2", p2); + region.Put("3", p3); + region.Put("4", p4); + } + */ + /* + public void StepTwoFailover() + { + CacheHelper.StartJavaServer(2, "GFECS2"); + Util.Log("Cacheserver 2 started."); + + IAsyncResult killRes = null; + KillServerDelegate ksd = new KillServerDelegate(KillServer); + CacheHelper.CreateTCRegion(QueryRegionNames[0], true, true, null, true); + + IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QueryRegionNames[0]); + + QueryService qs = CacheHelper.DCache.GetQueryService(); + CqAttributesFactory cqFac = new CqAttributesFactory(); + ICqListener cqLstner = new MyCqListener(); + cqFac.AddCqListener(cqLstner); + CqAttributes cqAttr = cqFac.Create(); + CqQuery qry = qs.NewCq(CqName1, "select * from /" + QERegionName + " p where p.ID!<4", cqAttr, true); + qry.Execute(); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + qry = qs.GetCq(CqName1); + cqAttr = qry.GetCqAttributes(); + ICqListener[] vl = cqAttr.getCqListeners(); + Assert.IsNotNull(vl); + Assert.AreEqual(1, vl.Length); + cqLstner = vl[0]; + Assert.IsNotNull(cqLstner); + MyCqListener myLisner = cqLstner as MyCqListener; + if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() != 0) + { + Assert.Fail("cq after count not zero"); + } + + killRes = ksd.BeginInvoke(null, null); + Thread.Sleep(18000); // sleep 0.3min to allow failover complete + myLisner.failedOver(); + + Portfolio p1 = new Portfolio(1, 100); + Portfolio p2 = new Portfolio(2, 200); + Portfolio p3 = new Portfolio(3, 300); + Portfolio p4 = new Portfolio(4, 400); + + region.Put("4", p1); + region.Put("3", p2); + region.Put("2", p3); + region.Put("1", p4); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + + qry = qs.GetCq(CqName1); + cqAttr = qry.GetCqAttributes(); + vl = cqAttr.getCqListeners(); + cqLstner = vl[0]; + Assert.IsNotNull(vl); + Assert.AreEqual(1, vl.Length); + cqLstner = vl[0]; + Assert.IsNotNull(cqLstner); + myLisner = cqLstner as MyCqListener; + if (myLisner.getEventCountAfter() + myLisner.getErrorCountAfter() == 0) + { + Assert.Fail("no cq after failover"); + } + + killRes.AsyncWaitHandle.WaitOne(); + ksd.EndInvoke(killRes); + qry.Stop(); + qry.Close(); + } + */ + + public void ProcessCQ(string locators) + { + CacheHelper.CreateTCRegion_Pool<object, object>(QERegionName, true, true, + null, locators, "__TESTPOOL1_", true); + + IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(QERegionName); + Portfolio p1 = new Portfolio(1, 100); + Portfolio p2 = new Portfolio(2, 100); + Portfolio p3 = new Portfolio(3, 100); + Portfolio p4 = new Portfolio(4, 100); + + region["1"] = p1; + region["2"] = p2; + region["3"] = p3; + region["4"] = p4; + + QueryService<object, object> qs = null; + + qs = CacheHelper.DCache.GetPoolManager().Find("__TESTPOOL1_").GetQueryService<object, object>(); + + CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); + ICqListener<object, object> cqLstner = new MyCqListener<object, object>(); + ICqStatusListener<object, object> cqStatusLstner = new MyCqStatusListener<object, object>(1); + + ICqListener<object, object>[] v = new ICqListener<object, object>[2]; + cqFac.AddCqListener(cqLstner); + v[0] = cqLstner; + v[1] = cqStatusLstner; + cqFac.InitCqListeners(v); + Util.Log("InitCqListeners called"); + CqAttributes<object, object> cqAttr = cqFac.Create(); + CqQuery<object, object> qry1 = qs.NewCq("CQ1", "select * from /" + QERegionName + " p where p.ID >= 1", cqAttr, false); + qry1.Execute(); + + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + region["4"] = p1; + region["3"] = p2; + region["2"] = p3; + region["1"] = p4; + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + + qry1 = qs.GetCq("CQ1"); + cqAttr = qry1.GetCqAttributes(); + ICqListener<object, object>[] vl = cqAttr.getCqListeners(); + Assert.IsNotNull(vl); + Assert.AreEqual(2, vl.Length); + cqLstner = vl[0]; + Assert.IsNotNull(cqLstner); + MyCqListener<object, object> myLisner = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; + Util.Log("event count:{0}, error count {1}.", myLisner.getEventCountBefore(), myLisner.getErrorCountBefore()); + Assert.AreEqual(4, myLisner.getEventCountBefore()); + + cqStatusLstner = (ICqStatusListener<object, object>)vl[1]; + Assert.IsNotNull(cqStatusLstner); + MyCqStatusListener<object, object> myStatLisner = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>; + Util.Log("event count:{0}, error count {1}.", myStatLisner.getEventCountBefore(), myStatLisner.getErrorCountBefore()); + Assert.AreEqual(1, myStatLisner.getCqConnectedCount()); + Assert.AreEqual(4, myStatLisner.getEventCountBefore()); + + CqAttributesMutator<object, object> mutator = qry1.GetCqAttributesMutator(); + mutator.RemoveCqListener(cqLstner); + cqAttr = qry1.GetCqAttributes(); + Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); + Assert.AreEqual(1, cqAttr.getCqListeners().Length); + + mutator.RemoveCqListener(cqStatusLstner); + cqAttr = qry1.GetCqAttributes(); + Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); + Assert.AreEqual(0, cqAttr.getCqListeners().Length); + + ICqListener<object, object>[] v2 = new ICqListener<object, object>[2]; + v2[0] = cqLstner; + v2[1] = cqStatusLstner; + MyCqListener<object, object> myLisner2 = (MyCqListener<object, object>)cqLstner; + myLisner2.Clear(); + MyCqStatusListener<object, object> myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner; + myStatLisner2.Clear(); + mutator.SetCqListeners(v2); + cqAttr = qry1.GetCqAttributes(); + Assert.AreEqual(2, cqAttr.getCqListeners().Length); + + region["4"] = p1; + region["3"] = p2; + region["2"] = p3; + region["1"] = p4; + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + + qry1 = qs.GetCq("CQ1"); + cqAttr = qry1.GetCqAttributes(); + ICqListener<object, object>[] v3 = cqAttr.getCqListeners(); + Assert.IsNotNull(v3); + Assert.AreEqual(2, vl.Length); + cqLstner = v3[0]; + Assert.IsNotNull(cqLstner); + myLisner2 = (MyCqListener<object, object>)cqLstner;// as MyCqListener<object, object>; + Util.Log("event count:{0}, error count {1}.", myLisner2.getEventCountBefore(), myLisner2.getErrorCountBefore()); + Assert.AreEqual(4, myLisner2.getEventCountBefore()); + + cqStatusLstner = (ICqStatusListener<object, object>)v3[1]; + Assert.IsNotNull(cqStatusLstner); + myStatLisner2 = (MyCqStatusListener<object, object>)cqStatusLstner;// as MyCqStatusListener<object, object>; + Util.Log("event count:{0}, error count {1}.", myStatLisner2.getEventCountBefore(), myStatLisner2.getErrorCountBefore()); + Assert.AreEqual(0, myStatLisner2.getCqConnectedCount()); + Assert.AreEqual(4, myStatLisner2.getEventCountBefore()); + + mutator = qry1.GetCqAttributesMutator(); + mutator.RemoveCqListener(cqLstner); + cqAttr = qry1.GetCqAttributes(); + Util.Log("cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); + Assert.AreEqual(1, cqAttr.getCqListeners().Length); + + mutator.RemoveCqListener(cqStatusLstner); + cqAttr = qry1.GetCqAttributes(); + Util.Log("1 cqAttr.getCqListeners().Length = {0}", cqAttr.getCqListeners().Length); + Assert.AreEqual(0, cqAttr.getCqListeners().Length); + + region["4"] = p1; + region["3"] = p2; + region["2"] = p3; + region["1"] = p4; + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + + qry1 = qs.GetCq("CQ1"); + cqAttr = qry1.GetCqAttributes(); + ICqListener<object, object>[] v4 = cqAttr.getCqListeners(); + Assert.IsNotNull(v4); + Assert.AreEqual(0, v4.Length); + Util.Log("cqAttr.getCqListeners() done"); + } + + public void CreateAndExecuteCQ_StatusListener(string poolName, string cqName, string cqQuery, int id) + { + QueryService<object, object> qs = null; + qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>(); + CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); + cqFac.AddCqListener(new MyCqStatusListener<object, object>(id)); + CqAttributes<object, object> cqAttr = cqFac.Create(); + CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false); + qry.Execute(); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + } + + public void CreateAndExecuteCQ_Listener(string poolName, string cqName, string cqQuery, int id) + { + QueryService<object, object> qs = null; + qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>(); + CqAttributesFactory<object, object> cqFac = new CqAttributesFactory<object, object>(); + cqFac.AddCqListener(new MyCqListener<object, object>(/*id*/)); + CqAttributes<object, object> cqAttr = cqFac.Create(); + CqQuery<object, object> qry = qs.NewCq(cqName, cqQuery, cqAttr, false); + qry.Execute(); + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + } + + public void CheckCQStatusOnConnect(string poolName, string cqName, int onCqStatusConnect) + { + QueryService<object, object> qs = null; + qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>(); + CqQuery<object, object> query = qs.GetCq(cqName); + CqAttributes<object, object> cqAttr = query.GetCqAttributes(); + ICqListener<object, object>[] vl = cqAttr.getCqListeners(); + MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>) vl[0]; + Util.Log("CheckCQStatusOnConnect = {0} ", myCqStatusLstr.getCqConnectedCount()); + Assert.AreEqual(onCqStatusConnect, myCqStatusLstr.getCqConnectedCount()); + } + + public void CheckCQStatusOnDisConnect(string poolName, string cqName, int onCqStatusDisConnect) + { + QueryService<object, object> qs = null; + qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>(); + CqQuery<object, object> query = qs.GetCq(cqName); + CqAttributes<object, object> cqAttr = query.GetCqAttributes(); + ICqListener<object, object>[] vl = cqAttr.getCqListeners(); + MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0]; + Util.Log("CheckCQStatusOnDisConnect = {0} ", myCqStatusLstr.getCqDisConnectedCount()); + Assert.AreEqual(onCqStatusDisConnect, myCqStatusLstr.getCqDisConnectedCount()); + } + + public void PutEntries(string regionName) + { + IRegion<object, object> region = CacheHelper.GetVerifyRegion<object, object>(regionName); + for (int i = 1; i <= 10; i++) { + region["key-" + i] = "val-" + i; + } + Thread.Sleep(18000); // sleep 0.3min to allow server c query to complete + } + + public void CheckCQStatusOnPutEvent(string poolName, string cqName, int onCreateCount) + { + QueryService<object, object> qs = null; + qs = CacheHelper.DCache.GetPoolManager().Find(poolName).GetQueryService<object, object>(); + CqQuery<object, object> query = qs.GetCq(cqName); + CqAttributes<object, object> cqAttr = query.GetCqAttributes(); + ICqListener<object, object>[] vl = cqAttr.getCqListeners(); + MyCqStatusListener<object, object> myCqStatusLstr = (MyCqStatusListener<object, object>)vl[0]; + Util.Log("CheckCQStatusOnPutEvent = {0} ", myCqStatusLstr.getEventCountBefore()); + Assert.AreEqual(onCreateCount, myCqStatusLstr.getEventCountBefore()); + } + + public void CreateRegion(string locators, string servergroup, string regionName, string poolName) + { + CacheHelper.CreateTCRegion_Pool<object, object>(regionName, true, true, + null, locators, poolName, true, servergroup); + } + + void runCqQueryTest() + { + CacheHelper.SetupJavaServers(true, "remotequeryN.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_client1.Call(StepOne, CacheHelper.Locators); + Util.Log("StepOne complete."); + + m_client1.Call(StepTwo, m_usePdxObjects); + Util.Log("StepTwo complete."); + + if (!m_usePdxObjects) + m_client1.Call(StepOneQE, CacheHelper.Locators); + else + m_client1.Call(StepOnePdxQE, CacheHelper.Locators); + Util.Log("StepOne complete."); + + m_client1.Call(Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + } + + void runCqQueryStatusTest() + { + CacheHelper.SetupJavaServers(true, "cacheserver.xml", "cacheserver2.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_client1.Call(StepOne, CacheHelper.Locators); + Util.Log("StepOne complete."); + + m_client1.Call(CreateAndExecuteCQ_StatusListener, "__TESTPOOL1_", CqName1, CqQuery1, 100); + Util.Log("CreateAndExecuteCQ complete."); + + m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL1_", CqName1, 1); + Util.Log("CheckCQStatusOnConnect complete."); + + m_client1.Call(PutEntries, "DistRegionAck"); + Util.Log("PutEntries complete."); + + m_client1.Call(CheckCQStatusOnPutEvent, "__TESTPOOL1_", CqName1, 10); + Util.Log("CheckCQStatusOnPutEvent complete."); + + CacheHelper.SetupJavaServers(true, "cacheserver.xml", "cacheserver2.xml"); + CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1); + Util.Log("start server 2 complete."); + + Thread.Sleep(20000); + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + Thread.Sleep(20000); + m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 0); + Util.Log("CheckCQStatusOnDisConnect complete."); + + CacheHelper.StopJavaServer(2); + Util.Log("Cacheserver 2 stopped."); + Thread.Sleep(20000); + m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 1); + Util.Log("CheckCQStatusOnDisConnect complete."); + + CacheHelper.SetupJavaServers(true, "cacheserver.xml", "cacheserver2.xml"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + Thread.Sleep(20000); + + m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL1_", CqName1, 2); + Util.Log("CheckCQStatusOnConnect complete."); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + Thread.Sleep(20000); + + m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 2); + Util.Log("CheckCQStatusOnDisConnect complete."); + + m_client1.Call(Close); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + } + + void runCqQueryStatusTest2() + { + CacheHelper.SetupJavaServers(true, "cacheserver_servergroup.xml", "cacheserver_servergroup2.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("start server 1 complete."); + CacheHelper.StartJavaServerWithLocators(2, "GFECS2", 1); + Util.Log("start server 2 complete."); + + m_client1.Call(CreateRegion, CacheHelper.Locators, "group1", "DistRegionAck", "__TESTPOOL1_"); + Util.Log("CreateRegion DistRegionAck complete."); + + m_client1.Call(CreateRegion, CacheHelper.Locators, "group2", "DistRegionAck1", "__TESTPOOL2_"); + Util.Log("CreateRegion DistRegionAck1 complete."); + + m_client1.Call(CreateAndExecuteCQ_StatusListener, "__TESTPOOL1_", CqName1, CqQuery1, 100); + Util.Log("CreateAndExecuteCQ1 complete."); + + m_client1.Call(CreateAndExecuteCQ_StatusListener, "__TESTPOOL2_", CqName2, CqQuery2, 101); + Util.Log("CreateAndExecuteCQ2 complete."); + + m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL1_", CqName1, 1); + Util.Log("CheckCQStatusOnConnect1 complete."); + + m_client1.Call(CheckCQStatusOnConnect, "__TESTPOOL2_", CqName2, 1); + Util.Log("CheckCQStatusOnConnect2 complete."); + + m_client1.Call(PutEntries, "DistRegionAck"); + Util.Log("PutEntries1 complete."); + + m_client1.Call(PutEntries, "DistRegionAck1"); + Util.Log("PutEntries2 complete."); + + m_client1.Call(CheckCQStatusOnPutEvent, "__TESTPOOL1_", CqName1, 10); + Util.Log("CheckCQStatusOnPutEvent complete."); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + Thread.Sleep(20000); + + m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL1_", CqName1, 1); + Util.Log("CheckCQStatusOnDisConnect complete."); + + CacheHelper.StopJavaServer(2); + Util.Log("Cacheserver 2 stopped."); + Thread.Sleep(20000); + + m_client1.Call(CheckCQStatusOnDisConnect, "__TESTPOOL2_", CqName2, 1); + Util.Log("CheckCQStatusOnDisConnect complete."); + + m_client1.Call(Close); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + } + + void runCqQueryStatusTest3() + { + CacheHelper.SetupJavaServers(true, "remotequeryN.xml"); + CacheHelper.StartJavaLocator(1, "GFELOC"); + Util.Log("Locator started"); + CacheHelper.StartJavaServerWithLocators(1, "GFECS1", 1); + Util.Log("Cacheserver 1 started."); + + m_client1.Call(ProcessCQ, CacheHelper.Locators); + Util.Log("ProcessCQ complete."); + + m_client1.Call(Close); + + CacheHelper.StopJavaServer(1); + Util.Log("Cacheserver 1 stopped."); + + CacheHelper.StopJavaLocator(1); + Util.Log("Locator stopped"); + } + + [Test] + public void CqQueryTest() + { + runCqQueryTest(); + } + + [Test] + public void CqQueryPdxTest() + { + m_usePdxObjects = true; + runCqQueryTest(); + m_usePdxObjects = false; + } + + // [Test] + // public void CqFailover() + // { + // try + // { + // m_client1.Call(StepOneFailover); + // Util.Log("StepOneFailover complete."); + // + // m_client1.Call(StepTwoFailover); + // Util.Log("StepTwoFailover complete."); + // } + // finally + // { + // m_client1.Call(CacheHelper.StopJavaServers); + // } + // } + + [Test] + public void CqQueryStatusTest() + { + runCqQueryStatusTest(); + } + + [Test] + public void CqQueryStatusTest2() + { + runCqQueryStatusTest2(); + } + + [Test] + public void CqQueryStatusTest3() + { + runCqQueryStatusTest3(); + } + + } +}