Author: enridaga
Date: Fri Nov 18 13:37:29 2011
New Revision: 1203647
URL: http://svn.apache.org/viewvc?rev=1203647&view=rev
Log:
Services tests. Added concurrency tests on background jobs (start/ping)
(STANBOL-379, STANBOL-343)
Added:
incubator/stanbol/branches/lto-reasoners/reasoners/services-tests/src/test/java/org/apache/stanbol/reasoners/it/concurrency/
incubator/stanbol/branches/lto-reasoners/reasoners/services-tests/src/test/java/org/apache/stanbol/reasoners/it/concurrency/ReasonersConcurrencyTest.java
Added:
incubator/stanbol/branches/lto-reasoners/reasoners/services-tests/src/test/java/org/apache/stanbol/reasoners/it/concurrency/ReasonersConcurrencyTest.java
URL:
http://svn.apache.org/viewvc/incubator/stanbol/branches/lto-reasoners/reasoners/services-tests/src/test/java/org/apache/stanbol/reasoners/it/concurrency/ReasonersConcurrencyTest.java?rev=1203647&view=auto
==============================================================================
---
incubator/stanbol/branches/lto-reasoners/reasoners/services-tests/src/test/java/org/apache/stanbol/reasoners/it/concurrency/ReasonersConcurrencyTest.java
(added)
+++
incubator/stanbol/branches/lto-reasoners/reasoners/services-tests/src/test/java/org/apache/stanbol/reasoners/it/concurrency/ReasonersConcurrencyTest.java
Fri Nov 18 13:37:29 2011
@@ -0,0 +1,260 @@
+package org.apache.stanbol.reasoners.it.concurrency;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.stanbol.reasoners.test.ReasonersTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * To test background jobs with parallel client requests.
+ *
+ */
+public class ReasonersConcurrencyTest extends ReasonersTestBase{
+
+ private static final String TEST_URL =
"http://xmlns.com/foaf/spec/index.rdf";
+
+ // The number of jobs to start is = (services * tasks * SCALE)
+ // Increase this to multiply the number of calls
+ private static final int SCALE = 1;
+
+ private ExecutorService executor;
+
+ static Logger log =
LoggerFactory.getLogger(ReasonersConcurrencyTest.class);
+
+ static int counter = 0;
+
+ @Before
+ public void setUp() throws Exception {
+ executor = Executors.newCachedThreadPool();
+ counter = 0;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ throw new Exception("Timeout while waiting for termination");
+ }
+ log.info("Done {} calls",counter);
+ counter = 0;
+ }
+
+ /**
+ * Execute a set of http calls to start jobs.
+ * Then ping the jobs until they are done.
+ *
+ * Both starts and pings are executed as set of parallel threads.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void basicConcurrencyTest() throws Exception{
+ log.info("basicConcurrencyTest()");
+ // We start in parallel a set of background jobs
+ List<JobClient> tasks = buildStarters();
+
+ List<String> jids = new ArrayList<String>();
+ List<Future<Result>> futures = executor.invokeAll(tasks);
+ for (Future<Result> future : futures) {
+ String j = future.get().assertResult().getContentString();
+ log.info("Got job id: {}",j);
+ jids.add(j);
+ }
+
+ // We ping in parallel all jobs.
+ // On each iteration, we prepare a new set of calls only on jobs
+ // which are not terminated
+ List<String> done = new ArrayList<String>();
+ while((done.size() < jids.size())){
+ // List of calls
+ tasks = new ArrayList<JobClient>();
+ // Prepare a Pinger on each unfinished job
+ for(String j : jids){
+ if(!done.contains(j)){
+ tasks.add(new Pinger(j));
+ }
+ }
+ // Invoke all unfinished jobs in parallel
+ futures = executor.invokeAll(tasks);
+ // Query each response
+ for (Future<Result> future : futures) {
+ PingerResult pr = (PingerResult) future.get();
+ String r = pr.assertResult().getContentString();
+ String jid = pr.jid();
+ if(!r.equals("Job is still working")){
+ log.info("{} is done!", jid);
+ done.add(jid);
+ }else{
+ log.info("{} is still working ... ",jid);
+ }
+ }
+ }
+ }
+
+ /**
+ * Builds a list of calls which start jobs.
+ * By default, returns a JobCLient for each service on each task.
+ * Do this SCALE times.
+ *
+ * @return
+ */
+ private List<JobClient> buildStarters(){
+ List<JobClient> tasks = new ArrayList<JobClient>();
+ for(int i= 0; i<SCALE; i++){
+ for(String s : allServices()){
+ for(String t: TASKS){
+ tasks.add(new Starter(s, t, "url", TEST_URL));
+ }
+ }
+ }
+ return tasks;
+ }
+
+ /***********************************************************
+ * Utility classes & methods
+ ***********************************************************/
+ private abstract class JobClient implements Callable<Result> {
+ abstract URI uri(String queryString);
+
+ protected HttpResponse get() throws Exception{
+ return get(new String[0]);
+ }
+
+ protected HttpResponse get(String... params) throws Exception{
+ HttpClient client = new DefaultHttpClient();
+ HttpGet request = new HttpGet();
+ request.setURI(uri(buildQuerystring(params)));
+
+ log.debug("Sending request[{}]: {} ",counter,
request.getURI().toString());
+
+ // Increment global counter
+ counter++;
+
+ return client.execute(request);
+ }
+
+ protected String buildQuerystring(String... params){
+ StringBuilder qsb = new StringBuilder();
+
+ for(int i=0; i<params.length; i=i+2){
+ if(i==0){
+ qsb.append("?");
+ }else{
+ qsb.append("&");
+ }
+ qsb.append(params[i]);
+ qsb.append("=");
+ qsb.append(params[i+1]);
+ }
+ return qsb.toString();
+ }
+ }
+
+ private class Pinger extends JobClient {
+ String jid = null;
+
+ Pinger(String jid){
+ this.jid = jid;
+ }
+
+ URI uri(String queryString){
+ return
URI.create(ReasonersConcurrencyTest.this.builder.buildUrl(REASONERS_PATH+"/jobs/ping/"+jid+queryString));
+ }
+
+ public PingerResult call() throws Exception {
+ return new PingerResult(jid, get());
+ }
+ }
+
+ private class Starter extends JobClient {
+ String service = null;
+ String task = null;
+ String[] queryString = null;
+
+ Starter(String service, String task, String... queryParameters){
+ this.service = service;
+ this.task = task;
+ this.queryString = queryParameters;
+ }
+
+ URI uri(String queryString) {
+ return
URI.create(ReasonersConcurrencyTest.this.builder.buildUrl(REASONERS_PATH +
service
+ +
task + "/job" + queryString));
+ }
+
+ @Override
+ public StarterResult call() throws Exception {
+ return new StarterResult(get(queryString));
+ }
+ }
+
+ abstract class Result {
+ protected HttpResponse response = null;
+
+ Result(HttpResponse response){
+ this.response = response;
+ }
+ abstract Result assertResult();
+
+ public HttpResponse getResponse(){
+ return response;
+ }
+ public String getContentString() throws IllegalStateException,
IOException{
+ return IOUtils.toString(this.response.getEntity().getContent());
+ }
+ }
+
+ private class PingerResult extends Result {
+ private String jid= null;
+ PingerResult(String jid, HttpResponse response){
+ super(response);
+ this.jid = jid;
+ }
+
+ String jid(){
+ return jid;
+ }
+ @Override
+ public Result assertResult() {
+ // Result of a ping request must be 200
+ assertNotNull(this.toString(), response);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ return this;
+ }
+ }
+
+ private class StarterResult extends Result {
+
+ StarterResult(HttpResponse response){
+ super(response);
+ }
+
+ @Override
+ public Result assertResult() {
+ // Result of a start request must be 200
+ assertNotNull(this.toString(), response);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ return this;
+ }
+ }
+}