Merge branch 'master' into remove-inmemory-event-service

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fad8ecdb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fad8ecdb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fad8ecdb

Branch: refs/heads/remove-inmemory-event-service
Commit: fad8ecdb08e58a9a72a9268763dfee48e87d2b55
Parents: 5eed63d 59edea1
Author: Shawn Feldman <sfeld...@apache.org>
Authored: Tue Oct 20 13:16:30 2015 -0600
Committer: Shawn Feldman <sfeld...@apache.org>
Committed: Tue Oct 20 13:16:30 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  15 ++-
 .../model/ElasticsearchIndexEvent.java          |   3 +-
 .../corepersistence/index/PublishRxTest.java    |  95 ----------------
 .../usergrid/corepersistence/index/RxTest.java  | 108 +++++++++++++++++++
 .../core/astyanax/CassandraConfig.java          |   8 +-
 .../core/astyanax/CassandraConfigImpl.java      |   1 +
 .../persistence/core/astyanax/CassandraFig.java |   6 +-
 .../map/impl/MapSerializationImpl.java          |  37 ++-----
 .../queue/impl/SNSQueueManagerImpl.java         |  10 +-
 9 files changed, 140 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fad8ecdb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fad8ecdb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --cc 
stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
index 0000000,1d940d0..d7b0bdb
mode 000000,100644..100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@@ -1,0 -1,108 +1,108 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.usergrid.corepersistence.index;
+ 
+ 
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
+ import rx.Observable;
+ import rx.Subscription;
+ import rx.observables.ConnectableObservable;
+ import rx.schedulers.Schedulers;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ 
+ /**
+  * Test to test some assumptions about RX behaviors
+  */
+ public class RxTest {
+ 
+     @Test
+     public void testPublish() throws InterruptedException {
+ 
+         final int count = 10;
+ 
+         final CountDownLatch latch = new CountDownLatch( count );
+ 
+         final Subscription connectedObservable =
+             Observable.range( 0, count ).doOnNext( integer -> 
latch.countDown() ).subscribeOn( Schedulers.io() )
+                       .subscribe();
+ 
+ 
 -        final boolean completed = latch.await( 5, TimeUnit.SECONDS );
++        final boolean completed = latch.await( 3, TimeUnit.SECONDS );
+ 
+         assertTrue( "publish1 behaves as expected", completed );
+ 
+         final boolean completedSubscription = 
connectedObservable.isUnsubscribed();
+ 
+         assertTrue( "Subscription complete", completedSubscription );
+     }
+ 
+ 
+     @Test
+     @Ignore("This seems like it should work, yet blocks forever")
+     public void testConnectableObserver() throws InterruptedException {
+ 
+         final int count = 10;
+ 
+         final CountDownLatch latch = new CountDownLatch( count );
+ 
+         final ConnectableObservable<Integer> connectedObservable = 
Observable.range( 0, count ).publish();
+ 
+ 
+         //connect to our latch, which should run on it's own subscription
+         //start our latch running
+         connectedObservable.doOnNext( integer -> latch.countDown() 
).subscribeOn( Schedulers.io() ).subscribe();
+ 
+ 
+         final Observable<Integer> countObservable = 
connectedObservable.subscribeOn( Schedulers.io() ).count();
+ 
+         //start the sequence
+         connectedObservable.connect();
+ 
+ 
+         final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+ 
+         assertTrue( "publish1 behaves as expected", completed );
+ 
+         final int returnedCount = countObservable.toBlocking().last();
+ 
+         assertEquals( "Counts the same", count, returnedCount );
+     }
+ 
+ 
+     /**
+      * Tests that reduce emits
+      */
+     @Test
+     public void testReduceEmpty(){
+        final int result =  Observable.range( 0, 100 ).filter( value -> value 
== -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
+ 
+         assertEquals(0, result);
+     }
+ 
+ 
+ }

Reply via email to