twang126 commented on code in PR #28513:
URL: https://github.com/apache/beam/pull/28513#discussion_r1331993719
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2763,6 +2763,25 @@ public MockWork(long workToken) {
public void run() {}
}
+ private static class MockActiveWork extends StreamingDataflowWorker.Work {
+ public static volatile boolean exit;
Review Comment:
nit: comment the purpose of this, and why it has to be static volatile.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2923,6 +2942,90 @@ public void run() {
executor.shutdown();
}
+ @Test
+ public void testActiveThreadMetric() throws Exception {
+ int maxThreads = 5;
+ int threadExpiration = 60;
+ // setting up actual implementation of executor instead of mocking to keep
track of
+ // active thread count.
+ BoundedQueueExecutor executor =
+ new BoundedQueueExecutor(
+ maxThreads,
+ threadExpiration,
+ TimeUnit.SECONDS,
+ maxThreads,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataflowWorkUnits-%d")
+ .setDaemon(true)
+ .build());
+
+ StreamingDataflowWorker.ComputationState computationState =
+ new StreamingDataflowWorker.ComputationState(
+ "computation",
+
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+ executor,
+ ImmutableMap.of(),
+ null);
+
+ ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"),
1);
+
+ // real work
+ MockActiveWork m1 =
+ new MockActiveWork(1) {
+ @Override
+ public void run() {
+ int count = 0;
+ while (!exit) {
+ count += 1;
+ }
+ Thread.currentThread().interrupt();
+ }
+ };
+
+ // idle work
+ MockWork m2 =
+ new MockWork(2) {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+
+ // idle work
+ MockWork m3 =
+ new MockWork(3) {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ assertEquals(0, executor.activeCount());
+
+ assertTrue(computationState.activateWork(key1Shard1, m1));
+ executor.execute(m1, m1.getWorkItem().getSerializedSize());
+ Thread.sleep(1000);
Review Comment:
what's the purpose of this sleep? Are we waiting for the thread to get
started?
if we're waiting for a specific event, I think it might be cleaner and less
flaky to use a thread notification scheme:
https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods.
Basically instead of sleeping, wait for the notification to fire.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2923,6 +2942,90 @@ public void run() {
executor.shutdown();
}
+ @Test
+ public void testActiveThreadMetric() throws Exception {
+ int maxThreads = 5;
+ int threadExpiration = 60;
Review Comment:
nit: add a unit to the name (seconds/ms/etc) so threadExpirationSecs or w/e
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]