jihoonson closed pull request #5973: [Backport] Fix
ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner
URL: https://github.com/apache/incubator-druid/pull/5973
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d7bffedea17..62aca366e48 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -44,7 +44,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
@@ -72,6 +71,7 @@
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
+import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@@ -85,9 +85,9 @@
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -2059,13 +2059,19 @@ private boolean withinMinMaxRecordTime(final InputRow
row)
private static class SequenceMetadata
{
+ /**
+ * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This
lock is required because
+ * {@link #setEndOffsets)} can be called by both the main thread and the
HTTP thread.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
private final int sequenceId;
private final String sequenceName;
private final Map<Integer, Long> startOffsets;
private final Map<Integer, Long> endOffsets;
private final Set<Integer> assignments;
private final boolean sentinel;
- private volatile boolean checkpointed;
+ private boolean checkpointed;
@JsonCreator
public SequenceMetadata(
@@ -2082,8 +2088,8 @@ public SequenceMetadata(
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
- this.endOffsets = Maps.newHashMap(endOffsets);
- this.assignments = Sets.newHashSet(startOffsets.keySet());
+ this.endOffsets = new HashMap<>(endOffsets);
+ this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
}
@@ -2097,7 +2103,13 @@ public int getSequenceId()
@JsonProperty
public boolean isCheckpointed()
{
- return checkpointed;
+ lock.lock();
+ try {
+ return checkpointed;
+ }
+ finally {
+ lock.unlock();
+ }
}
@JsonProperty
@@ -2115,7 +2127,13 @@ public String getSequenceName()
@JsonProperty
public Map<Integer, Long> getEndOffsets()
{
- return endOffsets;
+ lock.lock();
+ try {
+ return endOffsets;
+ }
+ finally {
+ lock.unlock();
+ }
}
@JsonProperty
@@ -2126,19 +2144,31 @@ public boolean isSentinel()
public void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
- endOffsets.putAll(newEndOffsets);
- checkpointed = true;
+ lock.lock();
+ try {
+ endOffsets.putAll(newEndOffsets);
+ checkpointed = true;
+ }
+ finally {
+ lock.unlock();
+ }
}
public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
- assignments.clear();
- nextPartitionOffset.entrySet().forEach(partitionOffset -> {
- if (Longs.compare(endOffsets.get(partitionOffset.getKey()),
nextPartitionOffset.get(partitionOffset.getKey()))
- > 0) {
- assignments.add(partitionOffset.getKey());
- }
- });
+ lock.lock();
+ try {
+ assignments.clear();
+ nextPartitionOffset.entrySet().forEach(partitionOffset -> {
+ if (Longs.compare(endOffsets.get(partitionOffset.getKey()),
nextPartitionOffset.get(partitionOffset.getKey()))
+ > 0) {
+ assignments.add(partitionOffset.getKey());
+ }
+ });
+ }
+ finally {
+ lock.unlock();
+ }
}
public boolean isOpen()
@@ -2148,10 +2178,16 @@ public boolean isOpen()
boolean canHandle(ConsumerRecord<byte[], byte[]> record)
{
- return isOpen()
- && endOffsets.get(record.partition()) != null
- && record.offset() >= startOffsets.get(record.partition())
- && record.offset() < endOffsets.get(record.partition());
+ lock.lock();
+ try {
+ return isOpen()
+ && endOffsets.get(record.partition()) != null
+ && record.offset() >= startOffsets.get(record.partition())
+ && record.offset() < endOffsets.get(record.partition());
+ }
+ finally {
+ lock.unlock();
+ }
}
private SequenceMetadata()
@@ -2173,15 +2209,21 @@ public static SequenceMetadata
getSentinelSequenceMetadata()
@Override
public String toString()
{
- return "SequenceMetadata{" +
- "sequenceName='" + sequenceName + '\'' +
- ", sequenceId=" + sequenceId +
- ", startOffsets=" + startOffsets +
- ", endOffsets=" + endOffsets +
- ", assignments=" + assignments +
- ", sentinel=" + sentinel +
- ", checkpointed=" + checkpointed +
- '}';
+ lock.lock();
+ try {
+ return "SequenceMetadata{" +
+ "sequenceName='" + sequenceName + '\'' +
+ ", sequenceId=" + sequenceId +
+ ", startOffsets=" + startOffsets +
+ ", endOffsets=" + endOffsets +
+ ", assignments=" + assignments +
+ ", sentinel=" + sentinel +
+ ", checkpointed=" + checkpointed +
+ '}';
+ }
+ finally {
+ lock.unlock();
+ }
}
@@ -2194,28 +2236,40 @@ public String toString()
@Override
public Object getMetadata()
{
- Preconditions.checkState(
- assignments.isEmpty(),
- "This committer can be used only once all the records till
offsets [%s] have been consumed, also make sure to call updateAssignments
before using this committer",
- endOffsets
- );
+ lock.lock();
- // merge endOffsets for this sequence with globally
lastPersistedOffsets
- // This is done because this committer would be persisting only
sub set of segments
- // corresponding to the current sequence. Generally,
lastPersistedOffsets should already
- // cover endOffsets but just to be sure take max of offsets and
persist that
- for (Map.Entry<Integer, Long> partitionOffset :
endOffsets.entrySet()) {
- lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
- partitionOffset.getValue(),
-
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
- ));
- }
+ try {
+ Preconditions.checkState(
+ assignments.isEmpty(),
+ "This committer can be used only once all the records till
offsets [%s] have been consumed, also make"
+ + " sure to call updateAssignments before using this
committer",
+ endOffsets
+ );
- // Publish metadata can be different from persist metadata as we
are going to publish only
- // subset of segments
- return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new
KafkaPartitions(topic, lastPersistedOffsets),
- METADATA_PUBLISH_PARTITIONS, new
KafkaPartitions(topic, endOffsets)
- );
+ // merge endOffsets for this sequence with globally
lastPersistedOffsets
+ // This is done because this committer would be persisting
only sub set of segments
+ // corresponding to the current sequence. Generally,
lastPersistedOffsets should already
+ // cover endOffsets but just to be sure take max of offsets
and persist that
+ for (Map.Entry<Integer, Long> partitionOffset :
endOffsets.entrySet()) {
+ lastPersistedOffsets.put(
+ partitionOffset.getKey(),
+ Math.max(
+ partitionOffset.getValue(),
+
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
+ )
+ );
+ }
+
+ // Publish metadata can be different from persist metadata as
we are going to publish only
+ // subset of segments
+ return ImmutableMap.of(
+ METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic,
lastPersistedOffsets),
+ METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic,
endOffsets)
+ );
+ }
+ finally {
+ lock.unlock();
+ }
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]