Repository: incubator-streams Updated Branches: refs/heads/STREAMS-49 [created] 24e427fab
bumped deps very basic test passing Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/24e427fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/24e427fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/24e427fa Branch: refs/heads/STREAMS-49 Commit: 24e427fabe1a23cafc2a8dd16bfc9891a50a805b Parents: eec3aa9 Author: sblackmon <[email protected]> Authored: Thu Oct 30 19:28:46 2014 -0500 Committer: sblackmon <[email protected]> Committed: Thu Oct 30 19:28:46 2014 -0500 ---------------------------------------------------------------------- streams-contrib/streams-persist-kafka/README.md | 21 ++++ streams-contrib/streams-persist-kafka/pom.xml | 57 ++++++----- .../streams/kafka/KafkaPersistReader.java | 74 ++++++-------- .../streams/kafka/KafkaPersistReaderTask.java | 22 ++-- .../streams/kafka/KafkaPersistWriter.java | 43 ++------ .../streams/kafka/KafkaPersistWriterTask.java | 2 +- .../streams/kafka/StreamsPartitioner.java | 10 +- .../src/main/resources/reference.properties | 16 --- .../streams/kafka/test/TestKafkaCluster.java | 51 ++++++++++ .../streams/kafka/test/TestKafkaPersist.java | 101 +++++++++++++++++++ 10 files changed, 261 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/README.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/README.md b/streams-contrib/streams-persist-kafka/README.md new file mode 100644 index 0000000..dce830e --- /dev/null +++ b/streams-contrib/streams-persist-kafka/README.md @@ -0,0 +1,21 @@ +streams-persist-kafka +===================== + +Read and write to Kafka + +Example reader/writer configuration: + + kafka.metadata.broker.list=localhost:9092 + + kafka.zk.connect=localhost:2181 + + kafka.topic=topic + + kafka.groupid=group + +java -cp jar -Dconfig.file={json/hocon typesafe config} \ + -Dkafka.metadata.broker.list=localhost:9092 class \ + -Dkafka.zk.connect=localhost:2181 \ + -Dkafka.topic=topic \ + -Dkafka.groupid=group + http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml index f86a0f3..4e9e375 100644 --- a/streams-contrib/streams-persist-kafka/pom.xml +++ b/streams-contrib/streams-persist-kafka/pom.xml @@ -12,8 +12,8 @@ <artifactId>streams-persist-kafka</artifactId> <properties> - <scala.version>2.9.2</scala.version> - <kafka.version>0.8.0</kafka.version> + <scala.version>2.10</scala.version> + <kafka.version>0.8.1.1</kafka.version> </properties> <dependencies> @@ -43,14 +43,6 @@ <version>${kafka.version}</version> <exclusions> <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> @@ -67,35 +59,46 @@ <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> - <version>0.3</version> + <version>0.4</version> <scope>compile</scope> <exclusions> <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.version}</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>2.6.0</version> + <scope>test</scope> </dependency> </dependencies> <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java index a7810b1..a77c941 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java @@ -19,6 +19,7 @@ package org.apache.streams.kafka; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Queues; import com.typesafe.config.Config; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; @@ -28,6 +29,7 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; @@ -65,50 +67,37 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable { private ExecutorService executor = Executors.newSingleThreadExecutor(); public KafkaPersistReader() { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka"))); } - public KafkaPersistReader(Queue<StreamsDatum> persistQueue) { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); - this.persistQueue = persistQueue; + public KafkaPersistReader(KafkaConfiguration config) { + this.config = config; } - public void setConfig(KafkaConfiguration config) { - this.config = config; + @Override + public StreamsResultSet readAll() { + return readCurrent(); } @Override public void startStream() { - Properties props = new Properties(); - props.setProperty("serializer.encoding", "UTF8"); - - consumerConfig = new ConsumerConfig(props); - - consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); - - Whitelist topics = new Whitelist(config.getTopic()); - VerifiableProperties vprops = new VerifiableProperties(props); - - inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops)); - for (final KafkaStream stream : inStreams) { executor.submit(new KafkaPersistReaderTask(this, stream)); } - - } - - @Override - public StreamsResultSet readAll() { - return readCurrent(); } @Override public StreamsResultSet readCurrent() { - return null; + + StreamsResultSet current; + + synchronized( KafkaPersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + persistQueue.clear(); + } + + return current; } @Override @@ -126,28 +115,31 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable { return !executor.isShutdown() && !executor.isTerminated(); } - private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { + @Override + public void prepare(Object configurationObject) { + Properties props = new Properties(); - props.put("zookeeper.connect", a_zookeeper); - props.put("group.id", a_groupId); - props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.connect", config.getZkconnect()); + props.put("group.id", "streams"); + props.put("zookeeper.session.timeout.ms", "1000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); - return new ConsumerConfig(props); - } + props.put("auto.offset.reset", "smallest"); - @Override - public void prepare(Object configurationObject) { + consumerConfig = new ConsumerConfig(props); + + consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); + + Whitelist topics = new Whitelist(config.getTopic()); + VerifiableProperties vprops = new VerifiableProperties(props); + inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops)); + + persistQueue = new ConcurrentLinkedQueue<>(); } @Override public void cleanUp() { consumerConnector.shutdown(); - while( !executor.isTerminated()) { - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) {} - } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java index 83493e0..03fa291 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java @@ -18,6 +18,7 @@ package org.apache.streams.kafka; +import com.google.common.base.Preconditions; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; @@ -44,18 +45,17 @@ public class KafkaPersistReaderTask implements Runnable { @Override public void run() { - MessageAndMetadata<String,String> item; - while(true) { - - ConsumerIterator<String, String> it = stream.iterator(); - while (it.hasNext()) { - item = it.next(); - reader.persistQueue.add(new StreamsDatum(item.message())); - } - try { - Thread.sleep(new Random().nextInt(100)); - } catch (InterruptedException e) {} + Preconditions.checkNotNull(this.stream); + + ConsumerIterator<String, String> it = stream.iterator(); + while (it.hasNext()) { + MessageAndMetadata<String,String> item = it.next(); + reader.persistQueue.add(new StreamsDatum(item.message())); } + try { + Thread.sleep(new Random().nextInt(100)); + } catch (InterruptedException e) {} + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java index c5f029a..01db32f 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java @@ -36,7 +36,7 @@ import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable { +public class KafkaPersistWriter implements StreamsPersistWriter, Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class); @@ -49,18 +49,10 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R private Producer<String, String> producer; public KafkaPersistWriter() { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - } - - public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) { - Config config = StreamsConfigurator.config.getConfig("kafka"); - this.config = KafkaConfigurator.detectConfiguration(config); - this.persistQueue = persistQueue; + this(KafkaConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("kafka"))); } - public void setConfig(KafkaConfiguration config) { + public KafkaPersistWriter(KafkaConfiguration config) { this.config = config; } @@ -71,6 +63,7 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner"); props.put("request.required.acks", "1"); + props.put("auto.create.topics.enable", "true"); ProducerConfig config = new ProducerConfig(props); @@ -79,27 +72,13 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R new Thread(new KafkaPersistWriterTask(this)).start(); } - public void stop() { - producer.close(); - } - - public void setPersistQueue(Queue<StreamsDatum> persistQueue) { - this.persistQueue = persistQueue; - } - - public Queue<StreamsDatum> getPersistQueue() { - return this.persistQueue; - } - @Override public void write(StreamsDatum entry) { try { String text = mapper.writeValueAsString(entry); - String hash = GuidUtils.generateGuid(text); - - KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), hash, text); + KeyedMessage<String, String> data = new KeyedMessage<String, String>(config.getTopic(), entry.getId(), text); producer.send(data); @@ -109,19 +88,15 @@ public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, R } @Override - public void run() { - start(); - - // stop(); - } - - @Override public void prepare(Object configurationObject) { + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + start(); + } @Override public void cleanUp() { - stop(); + producer.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java index 5d8ee9e..4aa9707 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java @@ -38,7 +38,7 @@ public class KafkaPersistWriterTask implements Runnable { public void run() { while(true) { - if( writer.getPersistQueue().peek() != null ) { + if( writer.persistQueue.peek() != null ) { try { StreamsDatum entry = writer.persistQueue.remove(); writer.write(entry); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java index ebfff9a..fa38ca2 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java @@ -24,17 +24,15 @@ package org.apache.streams.kafka; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; -public class StreamsPartitioner implements Partitioner<String> { +public class StreamsPartitioner implements Partitioner { + public StreamsPartitioner (VerifiableProperties props) { } - public int partition(String key, int a_numPartitions) { + public int partition(Object key, int a_numPartitions) { int partition = 0; - int offset = key.lastIndexOf('.'); - if (offset > 0) { - partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions; - } + partition = key.hashCode() % a_numPartitions; return partition; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties b/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties index 967264d..164c990 100644 --- a/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties +++ b/streams-contrib/streams-persist-kafka/src/main/resources/reference.properties @@ -20,22 +20,6 @@ kafka.serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= -############################# Async Producer ############################# -# maximum time, in milliseconds, for buffering data on the producer queue -#queue.buffering.max.ms= - -# the maximum size of the blocking queue for buffering on the producer -#queue.buffering.max.messages= - -# Timeout for event enqueue: -# 0: events will be enqueued immediately or dropped if the queue is full -# -ve: enqueue will block indefinitely if the queue is full -# +ve: enqueue will block up to this many milliseconds if the queue is full -#queue.enqueue.timeout.ms= - -# the number of messages batched at the producer -#batch.num.messages= - kafka.groupid=kafka kafka.zk.connect=localhost:2181 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java new file mode 100644 index 0000000..167bc8c --- /dev/null +++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaCluster.java @@ -0,0 +1,51 @@ +package org.apache.streams.kafka.test; + +import java.io.IOException; +import java.util.Properties; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.TestUtils; + +import org.apache.curator.test.TestingServer; + +public class TestKafkaCluster { + KafkaServerStartable kafkaServer; + TestingServer zkServer; + + public TestKafkaCluster() throws Exception { + zkServer = new TestingServer(); + KafkaConfig config = getKafkaConfig(zkServer.getConnectString()); + kafkaServer = new KafkaServerStartable(config); + kafkaServer.startup(); + } + + private static KafkaConfig getKafkaConfig(final String + zkConnectString) { + scala.collection.Iterator<Properties> propsI = + TestUtils.createBrokerConfigs(1).iterator(); + assert propsI.hasNext(); + Properties props = propsI.next(); + assert props.containsKey("zookeeper.connect"); + props.put("zookeeper.connect", zkConnectString); + return new KafkaConfig(props); + } + + public String getKafkaBrokerString() { + return String.format("localhost:%d", + kafkaServer.serverConfig().port()); + } + + public String getZkConnectString() { + return zkServer.getConnectString(); + } + + public int getKafkaPort() { + return kafkaServer.serverConfig().port(); + } + + public void stop() throws IOException { + kafkaServer.shutdown(); + zkServer.stop(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/24e427fa/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java new file mode 100644 index 0000000..1d04f74 --- /dev/null +++ b/streams-contrib/streams-persist-kafka/src/test/java/org/apache/streams/kafka/test/TestKafkaPersist.java @@ -0,0 +1,101 @@ +package org.apache.streams.kafka.test; + +import com.google.common.collect.Lists; +import kafka.admin.AdminUtils; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import org.I0Itec.zkclient.ZkClient; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.kafka.KafkaConfiguration; +import org.apache.streams.kafka.KafkaPersistReader; +import org.apache.streams.kafka.KafkaPersistWriter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +import java.util.Properties; + +/** + * Created by sblackmon on 10/20/14. + */ +public class TestKafkaPersist { + + private TestKafkaCluster testKafkaCluster; + private KafkaConfiguration testConfiguration; + + private String testTopic = "testTopic"; + + @Before + public void prepareTest() { + + + try { + testKafkaCluster = new TestKafkaCluster(); + } catch (Throwable e ) { + e.printStackTrace(); + } + + testConfiguration = new KafkaConfiguration(); + testConfiguration.setBrokerlist(testKafkaCluster.getKafkaBrokerString()); + testConfiguration.setZkconnect(testKafkaCluster.getZkConnectString()); + testConfiguration.setTopic(testTopic); + + ZkClient zkClient = new ZkClient(testKafkaCluster.getZkConnectString(), 1000, 1000, ZKStringSerializer$.MODULE$); + + AdminUtils.createTopic(zkClient, testTopic, 1, 1, new Properties()); + } + + @Test + public void testPersistWriterString() { + + assert(testConfiguration != null); + assert(testKafkaCluster != null); + + KafkaPersistWriter testPersistWriter = new KafkaPersistWriter(testConfiguration); + testPersistWriter.prepare(null); + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + //Handle exception + } + + String testJsonString = "{\"dummy\":\"true\"}"; + + testPersistWriter.write(new StreamsDatum(testJsonString, "test")); + + testPersistWriter.cleanUp(); + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + //Handle exception + } + + KafkaPersistReader testPersistReader = new KafkaPersistReader(testConfiguration); + try { + testPersistReader.prepare(null); + } catch( Throwable e ) { + e.printStackTrace(); + Assert.fail(); + } + + testPersistReader.startStream(); + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + //Handle exception + } + + StreamsResultSet testResult = testPersistReader.readCurrent(); + + testPersistReader.cleanUp(); + + assert(testResult.size() == 1); + + } +}
