Repository: samza
Updated Branches:
  refs/heads/samza-sql adcd26678 -> 1dac25e17


http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
deleted file mode 100644
index 8faa92c..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTriggerBuilder{
-  private Field earlyTriggerField;
-  private Field lateTriggerField;
-  private Field timerTriggerField;
-  private Field earlyTriggerUpdater;
-  private Field lateTriggerUpdater;
-
-  @Before
-  public void testPrep() throws Exception {
-    this.earlyTriggerField = 
TriggerBuilder.class.getDeclaredField("earlyTrigger");
-    this.lateTriggerField = 
TriggerBuilder.class.getDeclaredField("lateTrigger");
-    this.timerTriggerField = 
TriggerBuilder.class.getDeclaredField("timerTrigger");
-    this.earlyTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
-    this.lateTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
-
-    this.earlyTriggerField.setAccessible(true);
-    this.lateTriggerField.setAccessible(true);
-    this.timerTriggerField.setAccessible(true);
-    this.earlyTriggerUpdater.setAccessible(true);
-    this.lateTriggerUpdater.setAccessible(true);
-  }
-
-  @Test public void testStaticCreators() throws NoSuchFieldException, 
IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    when(mockState.getNumberMessages()).thenReturn(2000L);
-    assertTrue(triggerField.apply(null, mockState));
-
-    Function<TestMessage, Boolean> tokenFunc = m -> true;
-    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    TestMessage m = mock(TestMessage.class);
-    assertTrue(triggerField.apply(m, mockState));
-
-    builder = 
TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
-    when(m.getTimestamp()).thenReturn(19999000000L);
-    assertFalse(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-    when(m.getTimestamp()).thenReturn(1001000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
mockFunc = mock(BiFunction.class);
-    builder = TriggerBuilder.earlyTrigger(mockFunc);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    assertEquals(triggerField, mockFunc);
-
-    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second 
to fire up the timerTrigger before assertion
-    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddTimerTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addTimeoutSinceFirstMessage(10000L);
-    // exam that both earlyTrigger and timer triggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
triggerField =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    // check the timer trigger
-    Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
-        (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    // exam that both early trigger and timer triggers are set up
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    triggerField = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    builder.addTimeoutSinceLastMessage(20000L);
-    // check the timer trigger
-    timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test public void testAddLateTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTriggerOnSizeLimit(10000L);
-    // exam that both earlyTrigger and lateTriggers are set up
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
earlyTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // check the late trigger
-    BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> 
lateTrigger =
-        (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, 
Boolean>) this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    // set the number of messages to 10001 to trigger the late trigger
-    when(mockState.getNumberMessages()).thenReturn(10001L);
-    assertTrue(lateTrigger.apply(null, mockState));
-
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
-    // exam that both earlyTrigger and lateTriggers are set up
-    earlyTrigger = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // exam the lateTrigger
-    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
-    lateTrigger = (BiFunction<TestMessage, 
WindowState<Collection<TestMessage>>, Boolean>) 
this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    List<TestMessage> mockList = mock(ArrayList.class);
-    when(mockList.size()).thenReturn(200);
-    when(mockState.getOutputValue()).thenReturn(mockList);
-    assertTrue(lateTrigger.apply(null, mockState));
-  }
-
-  @Test public void testAddTriggerUpdater() throws IllegalAccessException {
-    TriggerBuilder<TestMessage, Collection<TestMessage>> builder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.onEarlyTrigger(c -> { c.clear(); return c;} );
-    List<TestMessage> collection = new ArrayList<TestMessage>() {{
-      for(int i = 0; i < 10; i++) {
-        this.add(new TestMessage(String.format("key-%d", i), "string-value", 
System.nanoTime()));
-      }
-    }};
-    // exam that earlyTriggerUpdater is set up
-    Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
-    WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    earlyTriggerUpdater.apply(mockState);
-    assertTrue(collection.isEmpty());
-
-    collection.add(new TestMessage("key-to-stay", "string-to-stay", 
System.nanoTime()));
-    collection.add(new TestMessage("key-to-remove", "string-to-remove", 
System.nanoTime()));
-    builder.onLateTrigger(c -> {
-      c.removeIf(t -> t.getKey().equals("key-to-remove"));
-      return c;
-    });
-    // check the late trigger updater
-    Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>> lateTriggerUpdater =
-        (Function<WindowState<Collection<TestMessage>>, 
WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    lateTriggerUpdater.apply(mockState);
-    assertTrue(collection.size() == 1);
-    assertFalse(collection.get(0).isDelete());
-    assertEquals(collection.get(0).getKey(), "key-to-stay");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java 
b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
deleted file mode 100644
index 47a37dc..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.Windows.Window;
-import org.apache.samza.operators.api.internal.Trigger;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestWindows {
-
-  @Test public void testSessionWindows() throws NoSuchFieldException, 
IllegalAccessException {
-    // test constructing the default session window
-    Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, 
Collection<TestMessage>>> testWnd = Windows.intoSessions(
-        TestMessage::getKey);
-    assertTrue(testWnd instanceof Windows.SessionWindow);
-    Field wndKeyFuncField = 
Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
-    Field aggregatorField = 
Windows.SessionWindow.class.getDeclaredField("aggregator");
-    wndKeyFuncField.setAccessible(true);
-    aggregatorField.setAccessible(true);
-    Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testWnd);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "test-key");
-    BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> 
aggrFunc =
-        (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd);
-    TestMessage mockMsg = mock(TestMessage.class);
-    Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new 
ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains(mockMsg));
-
-    // test constructing the session window w/ customized session info
-    Window<TestMessage, String, Collection<Character>, WindowOutput<String, 
Collection<Character>>> testWnd2 = Windows.intoSessions(
-        m -> String.format("key-%d", m.getTimestamp()), m -> 
m.getMessage().charAt(0));
-    assertTrue(testWnd2 instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
-    aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, 
Collection<TestMessage>>) aggregatorField.get(testWnd2);
-    assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 
0)), "key-0");
-    when(mockMsg.getMessage()).thenReturn("x-001");
-    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains('x'));
-
-    // test constructing session window w/ a default counter
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    assertTrue(testCounter instanceof Windows.SessionWindow);
-    wndKeyFunc = (Function<TestMessage, String>) 
wndKeyFuncField.get(testCounter);
-    BiFunction<TestMessage, Integer, Integer> counterFn = 
(BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
-    when(mockMsg.getTimestamp()).thenReturn(12345L);
-    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
-    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
-  }
-
-  @Test public void testSetTriggers() throws NoSuchFieldException, 
IllegalAccessException {
-    Window<TestMessage, String, Integer, WindowOutput<String, Integer>> 
testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getTimestamp()));
-    // test session window w/ a trigger
-    TriggerBuilder<TestMessage, Integer> triggerBuilder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
-    testCounter.setTriggers(triggerBuilder);
-    Trigger<TestMessage, WindowState<Integer>> expectedTrigger = 
triggerBuilder.build();
-    Trigger<TestMessage, WindowState<Integer>> actualTrigger = 
Windows.getInternalWindowFn(testCounter).getTrigger();
-    // examine all trigger fields are expected
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field earlyTriggerUpdater = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdater = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdater.setAccessible(true);
-    lateTriggerUpdater.setAccessible(true);
-    assertEquals(earlyTriggerField.get(expectedTrigger), 
earlyTriggerField.get(actualTrigger));
-    assertEquals(lateTriggerField.get(expectedTrigger), 
lateTriggerField.get(actualTrigger));
-    assertEquals(timerTriggerField.get(expectedTrigger), 
timerTriggerField.get(actualTrigger));
-    assertEquals(earlyTriggerUpdater.get(expectedTrigger), 
earlyTriggerUpdater.get(actualTrigger));
-    assertEquals(lateTriggerUpdater.get(expectedTrigger), 
lateTriggerUpdater.get(actualTrigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
deleted file mode 100644
index e953078..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIncomingSystemMessage {
-
-  @Test public void testConstructor() {
-    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    IncomingSystemMessage ism = new IncomingSystemMessage(ime);
-
-    Object mockKey = mock(Object.class);
-    Object mockValue = mock(Object.class);
-    LongOffset testOffset = new LongOffset("12345");
-    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
-
-    when(ime.getKey()).thenReturn(mockKey);
-    when(ime.getMessage()).thenReturn(mockValue);
-    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
-    when(ime.getOffset()).thenReturn("12345");
-
-    assertEquals(ism.getKey(), mockKey);
-    assertEquals(ism.getMessage(), mockValue);
-    assertEquals(ism.getSystemStreamPartition(), mockSsp);
-    assertEquals(ism.getOffset(), testOffset);
-    assertFalse(ism.isDelete());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
deleted file mode 100644
index 10775ec..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.data;
-
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-
-public class TestLongOffset {
-
-  @Test public void testConstructor() throws Exception {
-    LongOffset o1 = new LongOffset("12345");
-    Field offsetField = LongOffset.class.getDeclaredField("offset");
-    offsetField.setAccessible(true);
-    Long x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    o1 = new LongOffset("012345");
-    x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    try {
-      o1 = new LongOffset("xyz");
-      fail("Constructor of LongOffset should have failed w/ mal-formatted 
numbers");
-    } catch (NumberFormatException nfe) {
-      // expected
-    }
-  }
-
-  @Test public void testComparator() {
-    LongOffset o1 = new LongOffset("11111");
-    Offset other = mock(Offset.class);
-    try {
-      o1.compareTo(other);
-      fail("compareTo() should have have failed when comparing to an object of 
a different class");
-    } catch (IllegalArgumentException iae) {
-      // expected
-    }
-
-    LongOffset o2 = new LongOffset("-10000");
-    assertEquals(o1.compareTo(o2), 1);
-    LongOffset o3 = new LongOffset("22222");
-    assertEquals(o1.compareTo(o3), -1);
-    LongOffset o4 = new LongOffset("11111");
-    assertEquals(o1.compareTo(o4), 0);
-  }
-
-  @Test public void testEquals() {
-    LongOffset o1 = new LongOffset("12345");
-    Offset other = mock(Offset.class);
-    assertFalse(o1.equals(other));
-
-    LongOffset o2 = new LongOffset("0012345");
-    assertTrue(o1.equals(o2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
deleted file mode 100644
index 6dc77e5..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperators {
-
-  private class TestMessage implements Message<String, Object> {
-    private final long timestamp;
-    private final String key;
-    private final Object msg;
-
-
-    TestMessage(String key, Object msg, long timestamp) {
-      this.timestamp = timestamp;
-      this.key = key;
-      this.msg = msg;
-    }
-
-    @Override public Object getMessage() {
-      return this.msg;
-    }
-
-    @Override public String getKey() {
-      return this.key;
-    }
-
-    @Override public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  @Test public void testGetStreamOperator() {
-    Function<Message, Collection<TestMessage>> transformFn = m -> new 
ArrayList<TestMessage>() {{
-      this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
-    }};
-    Operators.StreamOperator<Message, TestMessage> strmOp = 
Operators.getStreamOperator(transformFn);
-    assertEquals(strmOp.getFunction(), transformFn);
-    assertTrue(strmOp.getOutputStream() instanceof MessageStream);
-  }
-
-  @Test public void testGetSinkOperator() {
-    MessageStream.VoidFunction3<TestMessage, MessageCollector, 
TaskCoordinator> sinkFn = (m, c, t) -> {};
-    Operators.SinkOperator<TestMessage> sinkOp = 
Operators.getSinkOperator(sinkFn);
-    assertEquals(sinkOp.getFunction(), sinkFn);
-    assertTrue(sinkOp.getOutputStream() == null);
-  }
-
-  @Test public void testGetWindowOperator() {
-    WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, 
Integer>> windowFn = mock(WindowFn.class);
-    BiFunction<TestMessage, Entry<String, WindowState<Integer>>, 
WindowOutput<String, Integer>> xFunction = (m, e) -> null;
-    Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> 
storeFns = mock(Operators.StoreFunctions.class);
-    Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
-    MessageStream<TestMessage> mockInput = mock(MessageStream.class);
-    when(windowFn.getTransformFunc()).thenReturn(xFunction);
-    when(windowFn.getStoreFuncs()).thenReturn(storeFns);
-    when(windowFn.getTrigger()).thenReturn(trigger);
-    when(mockInput.toString()).thenReturn("mockStream1");
-
-    Operators.WindowOperator<TestMessage, String, WindowState<Integer>, 
WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
-    assertEquals(windowOp.getFunction(), xFunction);
-    assertEquals(windowOp.getStoreFunctions(), storeFns);
-    assertEquals(windowOp.getTrigger(), trigger);
-    assertEquals(windowOp.getStoreName(mockInput), 
String.format("input-mockStream1-wndop-%s", windowOp.toString()));
-  }
-
-  @Test public void testGetPartialJoinOperator() {
-    BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
-        (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
-            Math.max(m1.getTimestamp(), m2.getTimestamp()));
-    MessageStream<TestMessage> joinOutput = new MessageStream<>();
-    Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, 
?>, TestMessage> partialJoin =
-        Operators.getPartialJoinOperator(merger, joinOutput);
-
-    assertEquals(partialJoin.getOutputStream(), joinOutput);
-    Message<Object, Object> m = mock(Message.class);
-    Message<Object, Object> s = mock(Message.class);
-    assertEquals(partialJoin.getFunction(), merger);
-    
assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
-    
assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), 
m);
-    
assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), 
m.getKey());
-    assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
-  }
-
-  @Test public void testGetMergeOperator() {
-    MessageStream<TestMessage> output = new MessageStream<>();
-    Operators.StreamOperator<TestMessage, TestMessage> mergeOp = 
Operators.getMergeOperator(output);
-    Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new 
ArrayList<TestMessage>() {{
-      this.add(t);
-    }};
-    TestMessage t = mock(TestMessage.class);
-    assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getOutputStream(), output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
deleted file mode 100644
index 727276a..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.data.Message;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestTrigger {
-
-  @Test public void testConstructor() throws Exception {
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
-    BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> 
lateTrigger = (m, s) -> s.getOutputValue() > 1000;
-    Function<WindowState<Integer>, Boolean> timerTrigger = s -> 
TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < 
System.currentTimeMillis();
-    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = 
s -> { s.setOutputValue(0); return s; };
-    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = 
s -> { s.setOutputValue(1); return s; };
-
-    Trigger<Message<Object, Object>, WindowState<Integer>> trigger = 
Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
-        earlyTriggerUpdater, lateTriggerUpdater);
-
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field earlyTriggerUpdaterField = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdaterField = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdaterField.setAccessible(true);
-    lateTriggerUpdaterField.setAccessible(true);
-
-    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
-    assertEquals(timerTrigger, timerTriggerField.get(trigger));
-    assertEquals(lateTrigger, lateTriggerField.get(trigger));
-    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
-    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
deleted file mode 100644
index f3cf0e0..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api.internal;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
-
-  @Test public void testConstructor() {
-    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
-    assertEquals(wndOutput.getKey(), "testMsg");
-    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
-    assertFalse(wndOutput.isDelete());
-    assertEquals(wndOutput.getTimestamp(), 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
index d4d2378..33901a9 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.Windows;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.Windows;
 import org.apache.samza.task.TaskContext;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
index d228784..a49bfd3 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator;
-import org.apache.samza.operators.api.internal.Operators.SinkOperator;
-import org.apache.samza.operators.api.internal.Operators.StreamOperator;
-import org.apache.samza.operators.api.internal.Operators.WindowOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
 import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
 import org.apache.samza.operators.impl.window.SessionWindowImpl;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index d296111..4bd467d 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
index 14796fc..224245e 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
+import org.apache.samza.operators.TestMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
index c8c4944..69f16d0 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.internal.Operators.StreamOperator;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.internal.Operators.StreamOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index e711bc5..cdac3fc 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestOutputMessage;
-import org.apache.samza.operators.api.internal.Operators.SinkOperator;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.internal.Operators.SinkOperator;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
index eb8937a..5ede757 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
index 75cb00c..719ab99 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.operators.impl.window;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.TestMessage;
-import org.apache.samza.operators.api.WindowState;
-import org.apache.samza.operators.api.internal.Operators.StoreFunctions;
-import org.apache.samza.operators.api.internal.Operators.WindowOperator;
-import org.apache.samza.operators.api.internal.WindowOutput;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.TestMessage;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.internal.Operators.StoreFunctions;
+import org.apache.samza.operators.internal.Operators.WindowOperator;
+import org.apache.samza.operators.internal.WindowOutput;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
index 724bbba..493a688 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
@@ -20,12 +20,13 @@
 package org.apache.samza.task;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.Windows;
-import org.apache.samza.operators.api.TriggerBuilder;
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
 
b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
index 33ae9c9..820d4f3 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.task;
 
-import org.apache.samza.operators.api.data.InputSystemMessage;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.data.InputSystemMessage;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.Offset;
 import org.apache.samza.system.SystemStreamPartition;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
index 825f4c4..00d01a8 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
@@ -19,10 +19,11 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.operators.api.MessageStream;
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.data.IncomingSystemMessage;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
index 306425e..153d517 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
@@ -20,6 +20,7 @@ package org.apache.samza.task;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.impl.ChainedOperators;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.Partition;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
index d6181ea..fe0ca42 100644
--- 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
+++ 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
@@ -36,7 +36,7 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Unit test for {@link StreamOperatorTask}
+ * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask}
  */
 public class TestStreamOperatorTasks {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
index 11186ea..de7bba5 100644
--- a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
@@ -19,11 +19,12 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.operators.api.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.api.TriggerBuilder;
-import org.apache.samza.operators.api.Windows;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.data.Offset;
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+import org.apache.samza.operators.TriggerBuilder;
+import org.apache.samza.operators.Windows;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.task.StreamOperatorTask;
 import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.Collection;

Reply via email to