lukecwik commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r473396947
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -59,7 +59,41 @@
}
}
- private final Cache<KV<String, ByteString>, CacheEntry> cache;
+ private static class CacheKey {
Review comment:
Use `@AutoValue`
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
CacheBuilder.newBuilder()
.expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
.removalListener(
- (RemovalNotification<KV<String, ByteString>, CacheEntry>
notification) -> {
+ (RemovalNotification<CacheKey, CacheEntry> notification) -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
- LOG.info("Closing idle reader for {}",
keyToString(notification.getKey()));
+ LOG.info("Closing idle reader for {}",
notification.getKey().toString());
closeReader(notification.getKey(),
notification.getValue());
}
})
.build();
}
- private static String keyToString(KV<String, ByteString> key) {
- return key.getKey() + "-" + key.getValue().toStringUtf8();
- }
-
/** Close the reader and log a warning if close fails. */
- private void closeReader(KV<String, ByteString> key, CacheEntry entry) {
+ private void closeReader(CacheKey key, CacheEntry entry) {
try {
entry.reader.close();
} catch (IOException e) {
- LOG.warn("Failed to close UnboundedReader for {}", keyToString(key), e);
+ LOG.warn("Failed to close UnboundedReader for {}", key.toString(), e);
Review comment:
```suggestion
LOG.warn("Failed to close UnboundedReader for {}", key, e);
```
##########
File path:
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
Mockito.verifyNoMoreInteractions(mockExecutor);
}
+ @Test
+ public void testActiveWorkForShardedKeys() throws Exception {
+ BoundedQueueExecutor mockExecutor =
Mockito.mock(BoundedQueueExecutor.class);
+ StreamingDataflowWorker.ComputationState computationState =
+ new StreamingDataflowWorker.ComputationState(
+ "computation",
+
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+ mockExecutor,
+ ImmutableMap.of(),
+ null);
+
+ ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"),
1);
Review comment:
```suggestion
ShardedKey key1Shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"),
1);
```
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -1095,14 +1098,46 @@ public void run() {
}
}
};
- if (!computationState.activateWork(workItem.getKey(), work)) {
+ if (!computationState.activateWork(
+ new ShardedKey(workItem.getKey(), workItem.getShardingKey()), work)) {
// Free worker if the work was not activated.
// This can happen if it's duplicate work or some other reason.
sdkHarnessRegistry.completeWork(worker);
}
}
+ static class ShardedKey {
Review comment:
Use `@AutoValue`
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
CacheBuilder.newBuilder()
.expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
.removalListener(
- (RemovalNotification<KV<String, ByteString>, CacheEntry>
notification) -> {
+ (RemovalNotification<CacheKey, CacheEntry> notification) -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
- LOG.info("Closing idle reader for {}",
keyToString(notification.getKey()));
+ LOG.info("Closing idle reader for {}",
notification.getKey().toString());
Review comment:
no need for `toString` since `{}` does type conversions for you already
```suggestion
LOG.info("Closing idle reader for {}",
notification.getKey());
```
##########
File path:
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
Mockito.verifyNoMoreInteractions(mockExecutor);
}
+ @Test
+ public void testActiveWorkForShardedKeys() throws Exception {
+ BoundedQueueExecutor mockExecutor =
Mockito.mock(BoundedQueueExecutor.class);
+ StreamingDataflowWorker.ComputationState computationState =
+ new StreamingDataflowWorker.ComputationState(
+ "computation",
+
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+ mockExecutor,
+ ImmutableMap.of(),
+ null);
+
+ ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"),
1);
+ ShardedKey key1_shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"),
2);
Review comment:
```suggestion
ShardedKey key1Shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"),
2);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]