http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java index d78df94..63c75ef 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java @@ -18,20 +18,33 @@ */ package com.datatorrent.lib.db.jdbc; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.Operator.ProcessingMode; -import com.datatorrent.netlet.util.DTThrowable; -import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*; import com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.TestEvent; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.netlet.util.DTThrowable; + +import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID; +import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID; +import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.TABLE_NAME; import static com.datatorrent.lib.db.jdbc.JdbcOperatorTest.DB_DRIVER; import static com.datatorrent.lib.db.jdbc.JdbcOperatorTest.URL; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import java.sql.*; -import java.util.Random; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Test for {@link AbstractJdbcNonTransactionableBatchOutputOperator} @@ -54,17 +67,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest try { Class.forName(DB_DRIVER).newInstance(); con = DriverManager.getConnection(URL); - } - catch (SQLException ex) { - DTThrowable.rethrow(ex); - } - catch (ClassNotFoundException ex) { - DTThrowable.rethrow(ex); - } - catch (InstantiationException ex) { - DTThrowable.rethrow(ex); - } - catch (IllegalAccessException ex) { + } catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException ex) { DTThrowable.rethrow(ex); } } @@ -77,8 +80,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest try { con.close(); - } - catch (SQLException ex) { + } catch (SQLException ex) { DTThrowable.rethrow(ex); } } @@ -108,8 +110,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest int count = resultSet.getInt(1); stmt.close(); return count; - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("fetching count", e); } } @@ -145,55 +146,40 @@ public class JdbcNonTransactionalBatchOutputOperatorTest outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < HALF_BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 1, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should not be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should not be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(2); - for(int batchCounter = 0; - batchCounter < HALF_BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 2, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should not be written", - 2 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should not be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.teardown(); } @@ -207,36 +193,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 2 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.getStore().disconnect(); @@ -249,46 +225,33 @@ public class JdbcNonTransactionalBatchOutputOperatorTest OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); outputOperator.setup(context); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 2* BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 2 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 1, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 3 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 3 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); } @Test @@ -300,36 +263,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < HALF_BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.getStore().disconnect(); @@ -339,49 +292,38 @@ public class JdbcNonTransactionalBatchOutputOperatorTest attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L); attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + outputOperator.setup(context); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < HALF_BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 1, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 2 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); } @Test @@ -393,35 +335,25 @@ public class JdbcNonTransactionalBatchOutputOperatorTest outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 2 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.getStore().disconnect(); @@ -436,20 +368,15 @@ public class JdbcNonTransactionalBatchOutputOperatorTest outputOperator.beginWindow(2); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 2, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 3 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 3 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); } @Test @@ -461,36 +388,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest outputOperator.beginWindow(0); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(1); - for(int batchCounter = 0; - batchCounter < HALF_BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.getStore().disconnect(); @@ -503,28 +420,20 @@ public class JdbcNonTransactionalBatchOutputOperatorTest OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); outputOperator.setup(context); - Assert.assertEquals("Commit window id ", - 0, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); outputOperator.beginWindow(2); - for(int batchCounter = 0; - batchCounter < BATCH_SIZE; - batchCounter++) { + for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) { outputOperator.input.put(new TestEvent(random.nextInt())); } outputOperator.endWindow(); - Assert.assertEquals("Commit window id ", - 2, - outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); - Assert.assertEquals("Batch should be written", - 2 * BATCH_SIZE, - outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); + Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID)); + Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE, + outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection)); } }
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java index d539aaa..9880aae 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java @@ -32,10 +32,11 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Lists; + import com.datatorrent.api.DAG; -import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.google.common.collect.Lists; +import com.datatorrent.netlet.util.DTThrowable; /** * Test for {@link AbstractJdbcNonTransactionableOutputOperator Operator} @@ -70,8 +71,7 @@ public class JdbcNonTransactionalOutputOperatorTest String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)"; stmt.executeUpdate(createTable); - } - catch (Throwable e) { + } catch (Throwable e) { DTThrowable.rethrow(e); } } @@ -84,8 +84,7 @@ public class JdbcNonTransactionalOutputOperatorTest String cleanTable = "delete from " + TABLE_NAME; stmt.executeUpdate(cleanTable); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -122,12 +121,11 @@ public class JdbcNonTransactionalOutputOperatorTest String countQuery = "SELECT * FROM " + TABLE_NAME; ResultSet resultSet = stmt.executeQuery(countQuery); int count = 0; - while(resultSet.next()) { + while (resultSet.next()) { count++; } return count; - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("fetching count", e); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java index 56359fb..ef8f9a0 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java @@ -32,8 +32,7 @@ public class JdbcNonTransactionalStoreTest JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore(); try { jdbcNonTransactionalStore.beginTransaction(); - } - catch(RuntimeException e) { + } catch (RuntimeException e) { return; } Assert.fail("Exception should be thrown"); @@ -45,8 +44,7 @@ public class JdbcNonTransactionalStoreTest JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore(); try { jdbcNonTransactionalStore.commitTransaction(); - } - catch(RuntimeException e) { + } catch (RuntimeException e) { return; } Assert.fail("Exception should be thrown"); @@ -58,8 +56,7 @@ public class JdbcNonTransactionalStoreTest JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore(); try { jdbcNonTransactionalStore.rollbackTransaction(); - } - catch(RuntimeException e) { + } catch (RuntimeException e) { return; } Assert.fail("Exception should be thrown"); @@ -71,8 +68,7 @@ public class JdbcNonTransactionalStoreTest JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore(); try { jdbcNonTransactionalStore.isInTransaction(); - } - catch(RuntimeException e) { + } catch (RuntimeException e) { return; } Assert.fail("Exception should be thrown"); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java index d9daf97..98be88c 100644 --- a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java +++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java @@ -25,18 +25,22 @@ import java.io.PrintStream; import java.util.Date; import java.util.List; -import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; + +import com.google.common.collect.Lists; import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; import com.datatorrent.lib.util.TestUtils.TestInfo; -import com.google.common.collect.Lists; public class JsonFormatterTest { @@ -150,7 +154,7 @@ public class JsonFormatterTest Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}"; - System.out.println(validDataSink.collectedTuples.get(0)); + LOG.debug("{}", validDataSink.collectedTuples.get(0)); Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0)); } @@ -199,4 +203,6 @@ public class JsonFormatterTest { } + private static final Logger LOG = LoggerFactory.getLogger(JsonFormatterTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java index 50ed3bd..bb51ca4 100644 --- a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java +++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java @@ -33,12 +33,13 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.datatorrent.lib.parser.XmlParser; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; @@ -76,6 +77,7 @@ public class XmlFormatterTest } } + @Test public void testOperatorSerialization() { @@ -162,7 +164,7 @@ public class XmlFormatterTest operator.setup(null); operator.in.process(e); - System.out.println(validDataSink.collectedTuples.get(0)); + LOG.debug("{}", validDataSink.collectedTuples.get(0)); Assert.assertEquals(1, validDataSink.collectedTuples.size()); Assert.assertEquals(0, invalidDataSink.collectedTuples.size()); String expected = "<EmployeeBean>" + "<name>john</name>" @@ -202,7 +204,7 @@ public class XmlFormatterTest } - @XmlType (propOrder={"name","dept","eid", "dateOfJoining", "address"}) + @XmlType(propOrder = {"name", "dept", "eid", "dateOfJoining", "address"}) public static class EmployeeBean { @@ -292,4 +294,6 @@ public class XmlFormatterTest } + private static final Logger LOG = LoggerFactory.getLogger(XmlFormatterTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java index d8138d5..2ece6b2 100644 --- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java +++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java @@ -34,7 +34,7 @@ import com.datatorrent.api.Context.OperatorContext; */ public class OperatorContextTestHelper { - private final static ThreadLocal<DateFormat> DATE_FORMAT_THREAD_LOCAL = new ThreadLocal<DateFormat>() + private static final ThreadLocal<DateFormat> DATE_FORMAT_THREAD_LOCAL = new ThreadLocal<DateFormat>() { @Override protected DateFormat initialValue() http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java b/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java index 2cc0e1d..9d501aa 100644 --- a/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java +++ b/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java @@ -65,11 +65,9 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet if (topic != null) { subscriber = this; } - } - else if (type.equals("unsubscribe")) { + } else if (type.equals("unsubscribe")) { subscriber = null; - } - else if (type.equals("publish")) { + } else if (type.equals("publish")) { Object data = map.get("data"); if (data != null) { if (subscriber != null) { @@ -77,8 +75,7 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet } } } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.warn("Data read error", ex); } } @@ -109,8 +106,7 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet map.put("data", data); try { webSocket.connection.sendMessage(mapper.writeValueAsString(map)); - } - catch (IOException ex) { + } catch (IOException ex) { LOG.warn("Connection send error", ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java b/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java index e619ff8..a2c021e 100644 --- a/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java @@ -21,7 +21,6 @@ package com.datatorrent.lib.io; import org.junit.Assert; import org.junit.Test; -import com.datatorrent.lib.io.ApacheGenRandomLogs; import com.datatorrent.lib.testbench.CollectorTestSink; /** @@ -29,37 +28,39 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class ApacheRandomLogsTest { - @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) @Test - public void test() - { - ApacheGenRandomLogs oper = new ApacheGenRandomLogs(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - oper.setup(null); + public void test() + { + ApacheGenRandomLogs oper = new ApacheGenRandomLogs(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + oper.setup(null); - Thread t = new EmitTuples(oper); - t.start(); - try - { - Thread.sleep(1000); - } catch (InterruptedException e) - { - } - t.stop(); - Assert.assertTrue("Tuples emitted", sink.collectedTuples.size() > 0); - } + Thread t = new EmitTuples(oper); + t.start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + //Fixme + } + t.stop(); + Assert.assertTrue("Tuples emitted", sink.collectedTuples.size() > 0); + } - private class EmitTuples extends Thread { - private ApacheGenRandomLogs oper; - public EmitTuples(ApacheGenRandomLogs oper) - { - this.oper = oper; - } - @Override - public void run() - { - oper.emitTuples(); - } - } + private class EmitTuples extends Thread + { + private ApacheGenRandomLogs oper; + + public EmitTuples(ApacheGenRandomLogs oper) + { + this.oper = oper; + } + + @Override + public void run() + { + oper.emitTuples(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java index ada1148..959e25e 100644 --- a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java @@ -29,7 +29,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.commons.io.IOUtils; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.eclipse.jetty.server.Handler; @@ -39,6 +38,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.io.IOUtils; + import com.datatorrent.lib.testbench.CollectorTestSink; /** @@ -74,8 +75,7 @@ public class HttpJsonChunksInputOperatorTest response.getOutputStream().println(); response.getOutputStream().println(0); response.getOutputStream().flush(); - } - catch (JSONException e) { + } catch (JSONException e) { response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error generating response: " + e.toString()); } @@ -88,8 +88,6 @@ public class HttpJsonChunksInputOperatorTest server.start(); String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext"; - System.out.println(url); - final AbstractHttpInputOperator operator = new HttpJsonChunksInputOperator(); CollectorTestSink sink = new CollectorTestSink(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java index 10ec6c2..be405ab 100644 --- a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java @@ -28,7 +28,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.commons.io.IOUtils; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; @@ -36,6 +35,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.io.IOUtils; + import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; @@ -79,7 +80,6 @@ public class HttpLinesInputOperatorTest server.start(); String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext"; - System.out.println(url); final HttpLinesInputOperator operator = new HttpLinesInputOperator(); CollectorTestSink<String> sink = TestUtils.setSink(operator.outputPort, new CollectorTestSink<String>()); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java index b321cfa..927317e 100644 --- a/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java @@ -28,9 +28,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MultivaluedMap; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.core.util.MultivaluedMapImpl; - import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; @@ -38,6 +35,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Assert; import org.junit.Test; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.core.util.MultivaluedMapImpl; + import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java index eb69bdf..afe518b 100644 --- a/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java @@ -31,7 +31,6 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.core.MediaType; -import org.apache.commons.io.IOUtils; import org.codehaus.jettison.json.JSONObject; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; @@ -40,6 +39,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.io.IOUtils; + /** * Functional test for {@link com.datatorrent.lib.io.HttpPostOutputOperator}. */ @@ -75,8 +76,6 @@ public class HttpPostOutputOperatorTest server.start(); String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext"; - System.out.println("url: " + url); - HttpPostOutputOperator<Object> node = new HttpPostOutputOperator<Object>(); node.setUrl(url); @@ -95,7 +94,6 @@ public class HttpPostOutputOperatorTest } Assert.assertEquals("number requests", 1, receivedMessages.size()); - System.out.println(receivedMessages.get(0)); JSONObject json = new JSONObject(data); Assert.assertTrue("request body " + receivedMessages.get(0), receivedMessages.get(0).contains(json.toString())); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java index 4b29830..acb3fc4 100644 --- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java @@ -79,8 +79,7 @@ public class IdempotentStorageManagerTest storageManager.teardown(); try { FileUtils.deleteDirectory(new File("target/" + description.getClassName())); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -105,7 +104,7 @@ public class IdempotentStorageManagerTest testMeta.storageManager.save(data, 1, 1); testMeta.storageManager.setup(testMeta.context); @SuppressWarnings("unchecked") - Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageManager.load(1, 1); + Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1); Assert.assertEquals("dataOf1", data, decoded); } @@ -130,8 +129,7 @@ public class IdempotentStorageManagerTest for (Integer operatorId : decodedStates.keySet()) { if (operatorId == 1) { Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1)); - } - else { + } else { Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2)); } } @@ -182,8 +180,7 @@ public class IdempotentStorageManagerTest testMeta.storageManager.save(dataOf2, 2, 1); testMeta.storageManager.save(dataOf3, 3, 1); - testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager), - Sets.newHashSet(2, 3)); + testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager), Sets.newHashSet(2, 3)); testMeta.storageManager.setup(testMeta.context); testMeta.storageManager.deleteUpTo(1, 6); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java index 43f9186..7801619 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java @@ -34,8 +34,7 @@ public abstract class PubSubWebSocketAppDataOperatorTest public static final URI GATEWAY_CONNECT_ADDRESS; public static final URI URI_ADDRESS; - static - { + static { try { GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub"); URI_ADDRESS = new URI(URI_ADDRESS_STRING); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java index 3dc5be3..fc92429 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java @@ -22,13 +22,11 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import com.datatorrent.lib.helper.OperatorContextTestHelper; - import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; - import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider; +import com.datatorrent.lib.helper.OperatorContextTestHelper; public class PubSubWebSocketAppDataQueryTest extends PubSubWebSocketAppDataOperatorTest { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java index 402bb34..e165649 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java @@ -50,7 +50,7 @@ public class PubSubWebSocketOperatorTest contextHandler.addServlet(sh, "/pubsub"); contextHandler.addServlet(sh, "/*"); server.start(); - Connector connector[] = server.getConnectors(); + Connector[] connector = server.getConnectors(); URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub"); PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>(); @@ -100,10 +100,10 @@ public class PubSubWebSocketOperatorTest Assert.assertTrue("tuples emitted", sink.collectedTuples.size() > 1); @SuppressWarnings("unchecked") - Map<String, String> tuple = (Map<String, String>) sink.collectedTuples.get(0); + Map<String, String> tuple = (Map<String, String>)sink.collectedTuples.get(0); Assert.assertEquals("Expects {\"hello\":\"world\"} as data", "world", tuple.get("hello")); - String stringResult = (String) sink.collectedTuples.get(1); + String stringResult = (String)sink.collectedTuples.get(1); Assert.assertEquals("Expects {\"hello\":\"world\"} as data", stringData, stringResult); inputOperator.deactivate(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java index b4a649a..6bd839d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java @@ -26,20 +26,22 @@ import javax.mail.Message; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; -import org.apache.hadoop.conf.Configuration; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.StreamingApplication; +import org.apache.hadoop.conf.Configuration; + import com.google.common.collect.Maps; import com.icegreen.greenmail.util.GreenMail; import com.icegreen.greenmail.util.ServerSetup; import com.icegreen.greenmail.util.ServerSetupTest; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + public class SmtpOutputOperatorTest { @@ -97,7 +99,7 @@ public class SmtpOutputOperatorTest String expectedContent = content.replace("{}", data.toString()).trim(); Assert.assertTrue(expectedContent.equals(receivedContent)); - Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress()); + Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress()); Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString()); Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString()); Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString()); @@ -121,7 +123,7 @@ public class SmtpOutputOperatorTest String expectedContent = content.replace("{}", data.toString()).trim(); Assert.assertTrue(expectedContent.equals(receivedContent)); - Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress()); + Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress()); Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString()); } @@ -139,7 +141,8 @@ public class SmtpOutputOperatorTest conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc); final AtomicReference<SmtpOutputOperator> o1 = new AtomicReference<SmtpOutputOperator>(); - StreamingApplication app = new StreamingApplication() { + StreamingApplication app = new StreamingApplication() + { @Override public void populateDAG(DAG dag, Configuration conf) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java index 79d780d..a79ace7 100644 --- a/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java @@ -87,9 +87,8 @@ public class SocketInputOperatorTest reader.close(); clientChannel.close(); } - } - catch (Exception e) { - // LOG.debug("server ", e); + } catch (Exception e) { + //fixme } } } @@ -118,16 +117,15 @@ public class SocketInputOperatorTest operator.endWindow(); operator.deactivate(); operator.teardown(); - String outputString = (String) sink.collectedTuples.get(0); + String outputString = (String)sink.collectedTuples.get(0); Assert.assertEquals(strBuffer.substring(0, outputString.length()), sink.collectedTuples.get(0)); int length = outputString.length(); - outputString = (String) sink.collectedTuples.get(1); + outputString = (String)sink.collectedTuples.get(1); Assert.assertEquals(strBuffer.substring(length, length + outputString.length()), sink.collectedTuples.get(1)); server.interrupt(); server.join(); Thread.sleep(1000); - } - catch (Exception e) { + } catch (Exception e) { LOG.debug("exception", e); } } @@ -161,15 +159,14 @@ public class SocketInputOperatorTest int endIndex = 0; int start = 0; for (int i = 0; i < 10; i++) { - endIndex += ((String) sink.collectedTuples.get(i)).length(); + endIndex += ((String)sink.collectedTuples.get(i)).length(); Assert.assertEquals(strBuffer.substring(start, endIndex), sink.collectedTuples.get(i)); start = endIndex; } server.interrupt(); server.join(); Thread.sleep(1000); - } - catch (Exception e) { + } catch (Exception e) { LOG.debug("exception", e); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java index 184a6bd..5ce5276 100644 --- a/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java @@ -19,19 +19,18 @@ package com.datatorrent.lib.io; import java.net.URI; - import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Lists; - import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketClient; import org.eclipse.jetty.websocket.WebSocketClientFactory; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Lists; + public class WebSocketServerInputOperatorTest { @Test @@ -57,11 +56,10 @@ public class WebSocketServerInputOperatorTest long startTime = System.currentTimeMillis(); - while(startTime + 10000 > System.currentTimeMillis()) { - if(TestWSSIO.messages.size() >= 1) { + while (startTime + 10000 > System.currentTimeMillis()) { + if (TestWSSIO.messages.size() >= 1) { break; } - Thread.sleep(100); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java index b50fe29..d11125b 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java @@ -18,23 +18,35 @@ */ package com.datatorrent.lib.io.fs; -import com.datatorrent.api.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashSet; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; import com.datatorrent.lib.util.TestUtils.TestInfo; -import com.google.common.collect.*; -import java.io.*; -import java.util.*; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.*; -import org.junit.*; public class AbstractFileInputOperatorFailureHandlingTest { - @Rule public TestInfo testMeta = new TestInfo(); + @Rule + public TestInfo testMeta = new TestInfo(); public static class TestFileInputOperator extends AbstractFileInputOperator<String> { @@ -60,7 +72,8 @@ public class AbstractFileInputOperatorFailureHandlingTest br = null; } - @Override protected InputStream retryFailedFile(FailedFile ff) throws IOException + @Override + protected InputStream retryFailedFile(FailedFile ff) throws IOException { count = 0; return super.retryFailedFile(ff); @@ -90,13 +103,13 @@ public class AbstractFileInputOperatorFailureHandlingTest FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.getDir()).getAbsolutePath()), true); HashSet<String> allLines = Sets.newHashSet(); // Create files with 100 records. - for (int file=0; file<10; file++) { + for (int file = 0; file < 10; file++) { HashSet<String> lines = Sets.newHashSet(); - for (int line=0; line<10; line++) { - lines.add("f"+file+"l"+line); + for (int line = 0; line < 10; line++) { + lines.add("f" + file + "l" + line); } allLines.addAll(lines); - FileUtils.write(new File(testMeta.getDir(), "file"+file), StringUtils.join(lines, '\n')); + FileUtils.write(new File(testMeta.getDir(), "file" + file), StringUtils.join(lines, '\n')); } Thread.sleep(10); @@ -104,15 +117,16 @@ public class AbstractFileInputOperatorFailureHandlingTest TestFileInputOperator oper = new TestFileInputOperator(); CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.getDir()); oper.getScanner().setFilePatternRegexp(".*file[\\d]"); - oper.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap())); - for (long wid=0; wid<1000; wid++) { + oper.setup( + new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap())); + for (long wid = 0; wid < 1000; wid++) { oper.beginWindow(wid); oper.emitTuples(); oper.endWindow(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index 3a8661c..ea16185 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -86,34 +86,35 @@ public class AbstractFileInputOperatorTest } } - @Rule public TestMeta testMeta = new TestMeta(); + @Rule + public TestMeta testMeta = new TestMeta(); @Test public void testSinglePartiton() throws Exception { FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); HashSet<String> allLines = Sets.newHashSet(); - for (int file=0; file<2; file++) { + for (int file = 0; file < 2; file++) { HashSet<String> lines = Sets.newHashSet(); - for (int line=0; line<2; line++) { - lines.add("f"+file+"l"+line); + for (int line = 0; line < 2; line++) { + lines.add("f" + file + "l" + line); } allLines.addAll(lines); - FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n')); + FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n')); } LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); oper.getScanner().setFilePatternRegexp(".*file[\\d]"); oper.setup(testMeta.context); - for (long wid=0; wid<3; wid++) { + for (long wid = 0; wid < 3; wid++) { oper.beginWindow(wid); oper.emitTuples(); oper.endWindow(); @@ -133,8 +134,8 @@ public class AbstractFileInputOperatorTest Path path = new Path(new File(testMeta.dir).getAbsolutePath()); FileContext.getLocalFSFileContext().delete(path, true); - for (int file=0; file<4; file++) { - FileUtils.write(new File(testMeta.dir, "partition00"+file), ""); + for (int file = 0; file < 4; file++) { + FileUtils.write(new File(testMeta.dir, "partition00" + file), ""); } FileSystem fs = FileSystem.get(FileContext.getLocalFSFileContext().getDefaultFileSystem().getUri(), new Configuration()); @@ -158,13 +159,14 @@ public class AbstractFileInputOperatorTest Path path = new Path(new File(testMeta.dir).getAbsolutePath()); FileContext.getLocalFSFileContext().delete(path, true); - for (int file=0; file<4; file++) { - FileUtils.write(new File(testMeta.dir, "partition00"+file), ""); + for (int file = 0; file < 4; file++) { + FileUtils.write(new File(testMeta.dir, "partition00" + file), ""); } List<Partition<AbstractFileInputOperator<String>>> partitions = Lists.newArrayList(); partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper)); - Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions, new PartitioningContextImpl(null, 2)); + Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions, + new PartitioningContextImpl(null, 2)); Assert.assertEquals(2, newPartitions.size()); Assert.assertEquals(1, oper.getCurrentPartitions()); // partitioned() wasn't called @@ -202,20 +204,20 @@ public class AbstractFileInputOperatorTest Path path = new Path(new File(testMeta.dir).getAbsolutePath()); FileContext.getLocalFSFileContext().delete(path, true); int file; - for (file=0; file<4; file++) { - FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n"); + for (file = 0; file < 4; file++) { + FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n"); } CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); int wid = 0; // Read all records to populate processedList in operator. oper.setup(testMeta.context); - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { oper.beginWindow(wid); oper.emitTuples(); oper.endWindow(); @@ -233,7 +235,7 @@ public class AbstractFileInputOperatorTest partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper)); // incremental capacity controlled partitionCount property Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = initialState.definePartitions(partitions, - new PartitioningContextImpl(null, 0)); + new PartitioningContextImpl(null, 0)); Assert.assertEquals(2, newPartitions.size()); Assert.assertEquals(1, initialState.getCurrentPartitions()); Map<Integer, Partition<AbstractFileInputOperator<String>>> m = Maps.newHashMap(); @@ -253,8 +255,8 @@ public class AbstractFileInputOperatorTest } sink.clear(); - for(int i = 0; i < 10; i++) { - for(AbstractFileInputOperator<String> o : opers) { + for (int i = 0; i < 10; i++) { + for (AbstractFileInputOperator<String> o : opers) { o.beginWindow(wid); o.emitTuples(); o.endWindow(); @@ -266,12 +268,12 @@ public class AbstractFileInputOperatorTest Assert.assertEquals("No new tuples read ", 0, sink.collectedTuples.size()); // Add four new files with 3 records each. - for (; file<8; file++) { - FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n"); + for (; file < 8; file++) { + FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n"); } - for(int i = 0; i < 10; i++) { - for(AbstractFileInputOperator<String> o : opers) { + for (int i = 0; i < 10; i++) { + for (AbstractFileInputOperator<String> o : opers) { o.beginWindow(wid); o.emitTuples(); o.endWindow(); @@ -306,20 +308,20 @@ public class AbstractFileInputOperatorTest Path path = new Path(new File(testMeta.dir).getAbsolutePath()); FileContext.getLocalFSFileContext().delete(path, true); int file; - for (file=0; file<4; file++) { - FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n"); + for (file = 0; file < 4; file++) { + FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n"); } CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); int wid = 0; //Read some records oper.setup(testMeta.context); - for(int i = 0; i < 5; i++) { + for (int i = 0; i < 5; i++) { oper.beginWindow(wid); oper.emitTuples(); oper.endWindow(); @@ -357,8 +359,8 @@ public class AbstractFileInputOperatorTest } sink.clear(); - for(int i = 0; i < 10; i++) { - for(AbstractFileInputOperator<String> o : opers) { + for (int i = 0; i < 10; i++) { + for (AbstractFileInputOperator<String> o : opers) { o.beginWindow(wid); o.emitTuples(); o.endWindow(); @@ -391,20 +393,20 @@ public class AbstractFileInputOperatorTest Path path = new Path(new File(testMeta.dir).getAbsolutePath()); FileContext.getLocalFSFileContext().delete(path, true); int file; - for (file=0; file<4; file++) { - FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n"); + for (file = 0; file < 4; file++) { + FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n"); } CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + @SuppressWarnings({"unchecked", "rawtypes"}) + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); int wid = 0; //Read some records oper.setup(testMeta.context); - for(int i = 0; i < 5; i++) { + for (int i = 0; i < 5; i++) { oper.beginWindow(wid); oper.emitTuples(); oper.endWindow(); @@ -442,8 +444,8 @@ public class AbstractFileInputOperatorTest } sink.clear(); - for(int i = 0; i < 10; i++) { - for(AbstractFileInputOperator<String> o : opers) { + for (int i = 0; i < 10; i++) { + for (AbstractFileInputOperator<String> o : opers) { o.beginWindow(wid); o.emitTuples(); o.endWindow(); @@ -475,7 +477,7 @@ public class AbstractFileInputOperatorTest CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({ "unchecked", "rawtypes" }) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); @@ -510,7 +512,7 @@ public class AbstractFileInputOperatorTest CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({"unchecked", "rawtypes"}) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); @@ -545,7 +547,7 @@ public class AbstractFileInputOperatorTest CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({"unchecked", "rawtypes"}) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); @@ -581,7 +583,7 @@ public class AbstractFileInputOperatorTest CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({"unchecked", "rawtypes"}) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); @@ -713,7 +715,7 @@ public class AbstractFileInputOperatorTest CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({"unchecked", "rawtypes"}) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); @@ -775,7 +777,7 @@ public class AbstractFileInputOperatorTest CollectorTestSink<String> queryResults = new CollectorTestSink<String>(); @SuppressWarnings({"unchecked", "rawtypes"}) - CollectorTestSink<Object> sink = (CollectorTestSink) queryResults; + CollectorTestSink<Object> sink = (CollectorTestSink)queryResults; oper.output.setSink(sink); oper.setDirectory(testMeta.dir); @@ -837,7 +839,7 @@ public class AbstractFileInputOperatorTest List<TestStorageManager> storageManagers = Lists.newLinkedList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - storageManagers.add((TestStorageManager) p.getPartitionedInstance().idempotentStorageManager); + storageManagers.add((TestStorageManager)p.getPartitionedInstance().idempotentStorageManager); } Assert.assertEquals("count of storage managers", 2, storageManagers.size());
