m-trieu commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1367490042
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java:
##########
@@ -139,34 +168,54 @@ public void testActivateWorkForKey_QUEUED() {
public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() {
assertEquals(
Optional.empty(),
- activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey("someKey",
1L), 10L));
+ activeWorkState.completeWorkAndGetNextWorkForKey(
+ shardedKey("someKey", 1L), workDedupeToken(1L, 1L)));
}
@Test
- public void
testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchWorkToComplete()
{
- long workTokenToComplete = 1L;
+ public void
+
testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueWorkTokenDoesNotMatchWorkToComplete()
{
+ long workTokenInQueue = 2L;
+ long otherWorkToken = 1L;
+ long cacheToken = 1L;
+ Work workInQueue = createWork(createWorkItem(workTokenInQueue,
cacheToken));
+ ShardedKey shardedKey = shardedKey("someKey", 1L);
+
+ activeWorkState.activateWorkForKey(shardedKey, workInQueue);
+ activeWorkState.completeWorkAndGetNextWorkForKey(
+ shardedKey, workDedupeToken(otherWorkToken, cacheToken));
+
+ assertEquals(1, readOnlyActiveWork.get(shardedKey).size());
+ assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());
+ }
- Work workInQueue = createWork(createWorkItem(2L));
+ @Test
+ public void
+
testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueCacheTokenDoesNotMatchWorkToComplete()
{
+ long cacheTokenInQueue = 2L;
+ long otherCacheToken = 1L;
+ long workToken = 1L;
+ Work workInQueue = createWork(createWorkItem(workToken,
cacheTokenInQueue));
ShardedKey shardedKey = shardedKey("someKey", 1L);
activeWorkState.activateWorkForKey(shardedKey, workInQueue);
- activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey,
workTokenToComplete);
+ activeWorkState.completeWorkAndGetNextWorkForKey(
+ shardedKey, workDedupeToken(workToken, otherCacheToken));
assertEquals(1, readOnlyActiveWork.get(shardedKey).size());
assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());
}
@Test
public void
testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplete() {
- long workTokenToComplete = 1L;
-
- Work activeWork = createWork(createWorkItem(workTokenToComplete));
- Work nextWork = createWork(createWorkItem(2L));
+ Work activeWork = createWork(createWorkItem(1L, 1L));
+ Work nextWork = createWork(createWorkItem(2L, 2L));
ShardedKey shardedKey = shardedKey("someKey", 1L);
activeWorkState.activateWorkForKey(shardedKey, activeWork);
activeWorkState.activateWorkForKey(shardedKey, nextWork);
- activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey,
workTokenToComplete);
+ activeWorkState.completeWorkAndGetNextWorkForKey(
+ shardedKey, WorkDedupeKey.of(activeWork.getWorkItem()));
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]