[
https://issues.apache.org/jira/browse/BEAM-14187?focusedWorklogId=754514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754514
]
ASF GitHub Bot logged work on BEAM-14187:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Apr/22 10:55
Start Date: 08/Apr/22 10:55
Worklog Time Spent: 10m
Work Description: baeminbo commented on code in PR #17201:
URL: https://github.com/apache/beam/pull/17201#discussion_r845993595
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java:
##########
@@ -370,7 +373,7 @@ boolean bloomFilterMightContain(RandomAccessData keyBytes) {
position(rawChannel, footer.getBloomFilterPosition());
bloomFilter =
ScalableBloomFilterCoder.of().decode(Channels.newInputStream(rawChannel));
- indexPerShard = new HashMap<>();
+ indexPerShard = new ConcurrentHashMap<>();
Review Comment:
After writing `IllegalStateException` scenario above, I realized that
changing the type of `indexPerShard` to a thread-safe Map cannot prevent
`IllegalStateException` at [`checkState(indexPerShard.get(K) ==
null)`](https://github.com/apache/beam/blob/v2.38.0-RC1/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500-L503).`.
I added additional
[commit](https://github.com/apache/beam/pull/17201/commits/b74fc99bee7e1aa11b38495d53ab0b3147c17b34)
to remove the check. It's not necessary as the condition is checked at
if-clause just before it.
In addition, as `initializeForKeyedRead` allows concurrent access, it has a
benefit to update `indexPerShard` for multiple shardIds, but it can update a
key multiple times if multiple threads try to update the key at the same time.
We can fine-control the synchronization, but it may make the code a little bit
more complex (e.g. using `computeIfAbsent` with `indexPerShard`). Do you think
we should do more optimization?
Issue Time Tracking
-------------------
Worklog Id: (was: 754514)
Time Spent: 1.5h (was: 1h 20m)
> NullPointerException or IllegalStateException at IsmReaderImpl in Dataflow
> --------------------------------------------------------------------------
>
> Key: BEAM-14187
> URL: https://issues.apache.org/jira/browse/BEAM-14187
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Minbo Bae
> Priority: P2
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> h6. Problem
> Dataflow Java batch jobs with large side input intermittently throws
> {{NullPointerException}} or {{{}IllegalStateException{}}}.
> *
> [NullPointerException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/npe.png]
> happens at
> [IsmReaderImpl.overKeyComponents|https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L217]:
> *
> [IllegalStateException|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/IllegalStateException.png]
> happens at [IsmReaderImpl. initializeForKeyedRead
> |https://github.com/apache/beam/blob/v2.37.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmReaderImpl.java#L500].
> (all error logs in the Dataflow job is
> [here|https://gist.githubusercontent.com/baeminbo/459e283eadbc7752c9f23616b52d958a/raw/f0480b8eaff590fb3f3ae2ab98ddce7dd3b4a237/downloaded-logs-20220327-171955.json].)
> h6. Hypothesis
> The {{initializeForKeyedRead}} is not synchronized. Multiple threads can
> enter the method so that initialize the index for the same shard and update
> {{indexPerShard}} without synchronization. And, the {{overKeyComponents}}
> also accesses {{indexPerShard}} without synchronization. As {{indexPerShard}}
> is just a {{HashMap}} which is not thread-safe, it can cause
> {{NullPointerException}} and {{IllegalStateException}} above.
> h6. Suggestion
> I think it can fix this issue if we change the type of {{indexPerShard}} to a
> thread-safe map (e.g. {{ConcurrentHashMap}}).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)