You might find this easier to follow: public Promise<?>printEvens() { PushStreamProvider psp = new PushStreamProvider();
SimplePushEventSource<Long> ses = psp.createSimpleEventSource(Long.class); // Begin delivery when someone is listening ses.connectPromise().then(onConnect(ses)); // Create a listener which prints out even numbers return psp.createStream(ses). filter(l -> l % 2L == 0). limit(5000L). forEach(f -> System.out.println("Consumed event: " + f)); } private Success<Void, Void> onConnect(SimplePushEventSource<Long> ses) { return p -> { new Thread(() -> { long counter = 0; // Keep going as long as someone is listening while (ses.isConnected()) { ses.publish(++counter); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Published: " + counter); } // Restart delivery when a new listener connects ses.connectPromise().then(onConnect(ses)); }).start(); return null; }; } > On 26 Oct 2018, at 15:52, stbischof via osgi-dev <osgi-dev@mail.osgi.org> > wrote: > > PushStreamProvider psp = new PushStreamProvider(); > > SimplePushEventSource<Long> ses = psp.createSimpleEventSource(Long.class)) > > Success<Void,Void> onConnect = p -> { > new Thread(() -> { > long counter = 0; > // Keep going as long as someone is listening > while (ses.isConnected()) { > ses.publish(++counter); > Thread.sleep(100); > System.out.println("Published: " + counter); > } > // Restart delivery when a new listener connects > ses.connectPromise().then(onConnect); > }).start(); > return null; > }; > > // Begin delivery when someone is listening > ses.connectPromise().then(onConnect); > > // Create a listener which prints out even numbers > psp.createStream(ses). > filter(l -> l % 2L == 0). > limit(5000L). > > forEach(f -> System.out.println("Consumed event: " + f));
_______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org https://mail.osgi.org/mailman/listinfo/osgi-dev