[
https://issues.apache.org/jira/browse/BEAM-14187?focusedWorklogId=755353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755353
]
ASF GitHub Bot logged work on BEAM-14187:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Apr/22 17:55
Start Date: 11/Apr/22 17:55
Worklog Time Spent: 10m
Work Description: lukecwik commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r847592739
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -502,30 +513,65 @@ private Optional<SeekableByteChannel>
initializeForKeyedRead(
inChannel = initializeBloomFilterAndIndexPerShard(inChannel);
- checkState(indexPerShard != null,
+ checkState(
+ indexPerShard != null,
"indexPerShard must not be null after
initializeBloomFilterAndIndexPerShard.");
- // If the index has been populated and contains the shard id, we can
return.
- if (indexPerShard != null && indexPerShard.containsKey(shardId)) {
- checkState(bloomFilter != null, "Bloom filter expected to have been
initialized.");
+ if (indexPerShard.containsKey(shardId)) {
+ // ConcurrentHashMap.containsKey doesn't lock a bin, so it's faster than
`computeIfAbsent` for
+ // a present key.
return inChannel;
}
- Long startOfNextBlock =
shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset());
- // If this is the last block, then we need to grab the position of the
Bloom filter
- // as the upper bound.
- if (startOfNextBlock == null) {
- startOfNextBlock = footer.getBloomFilterPosition();
- }
+ // Using AtomicReference for an object holder. Actually, atomicity is not
required.
+ AtomicReference<SeekableByteChannel> rawChannelReference =
+ new AtomicReference<>(inChannel.orNull());
+
+ // JDK-8161372 (ConcurrentHashMap.computeIfAbsent locks bin when k
present) alleviated the
+ // performance loss by not acquiring lock for the first node. But, the fix
was applied to Java9,
+ // and it still has chance to lock the bin containing the key at the
second or next nodes. As we
+ // expect `indexPerShard` already has the shardId in most cases, it would
have a better
+ // performance to check `containsKey` above before invoking
`computeIfAbsent` here.
+ indexPerShard.computeIfAbsent(
+ shardId,
+ ignored -> {
+ Long startOfNextBlock =
shardOffsetToShardMap.higherKey(shardWithIndex.getBlockOffset());
+ // If this is the last block, then we need to grab the position of
the Bloom filter
+ // as the upper bound.
+ if (startOfNextBlock == null) {
+ startOfNextBlock = footer.getBloomFilterPosition();
+ }
+
+ checkState(
+ shardWithIndex.getIndexOffset() < startOfNextBlock,
+ "Expected the index start offset is less than the next block
start offset. "
+ + "But, IsmShard is '%s' and the next block offset is %s for
resourceId '%s'",
+ shardWithIndex,
+ startOfNextBlock,
+ resourceId);
+
+ try {
+ SeekableByteChannel rawChannel =
openIfNeeded(Optional.of(rawChannelReference.get()));
+ rawChannelReference.set(rawChannel);
+ return readIndexBlockForShard(resourceId, shardWithIndex,
startOfNextBlock, rawChannel);
+ } catch (IOException e) {
+ // Wrapping with RuntimeException
+ throw new RuntimeException(
+ "failed to read shard index for resourceId: " + resourceId + "
shardId: " + shardId,
Review Comment:
```suggestion
"Failed to read shard index for resourceId: " + resourceId +
" shardId: " + shardId,
```
Issue Time Tracking
-------------------
Worklog Id: (was: 755353)
Time Spent: 2h 40m (was: 2.5h)
> NullPointerException or IllegalStateException at IsmReaderImpl in Dataflow
> --------------------------------------------------------------------------
>
> Key: BEAM-14187
> URL: https://issues.apache.org/jira/browse/BEAM-14187
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Minbo Bae
> Priority: P2
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> h6. Problem
> Dataflow Java batch jobs with large side input intermittently throws
> {{NullPointerException}} or {{{}IllegalStateException{}}}.
> *
> [NullPointerException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/npe.png]
> happens at
> [IsmReaderImpl.overKeyComponents|https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L217]:
> *
> [IllegalStateException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/IllegalStateException.png]
> happens at [IsmReaderImpl. initializeForKeyedRead
> |https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500].
> (all error logs in the Dataflow job is
> [here|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/downloaded-logs-20220327-171955.json].)
> h6. Hypothesis
> The {{initializeForKeyedRead}} is not synchronized. Multiple threads can
> enter the method so that initialize the index for the same shard and update
> {{indexPerShard}} without synchronization. And, the {{overKeyComponents}}
> also accesses {{indexPerShard}} without synchronization. As {{indexPerShard}}
> is just a {{HashMap}} which is not thread-safe, it can cause
> {{NullPointerException}} and {{IllegalStateException}} above.
> h6. Suggestion
> I think it can fix this issue if we change the type of {{indexPerShard}} to a
> thread-safe map (e.g. {{ConcurrentHashMap}}).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)