This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new 9007591 Fix ConcurrentModificationException in
IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
9007591 is described below
commit 90075913f72add0a478c3058ef1fc0d67fcdd305
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Jul 9 12:17:05 2018 -0700
Fix ConcurrentModificationException in
IncrementalPublishingKafkaIndexTaskRunner (#5907) (#5973)
Ported without cherry-pick, since the original commit depends on the patch
that
splits KafkaIndexTask.
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 152 ++++++++++++++-------
1 file changed, 103 insertions(+), 49 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index d7bffed..62aca36 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -44,7 +44,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
@@ -72,6 +71,7 @@ import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.parsers.ParseException;
+import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@@ -85,9 +85,9 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -2059,13 +2059,19 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
private static class SequenceMetadata
{
+ /**
+ * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This
lock is required because
+ * {@link #setEndOffsets)} can be called by both the main thread and the
HTTP thread.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
private final int sequenceId;
private final String sequenceName;
private final Map<Integer, Long> startOffsets;
private final Map<Integer, Long> endOffsets;
private final Set<Integer> assignments;
private final boolean sentinel;
- private volatile boolean checkpointed;
+ private boolean checkpointed;
@JsonCreator
public SequenceMetadata(
@@ -2082,8 +2088,8 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
- this.endOffsets = Maps.newHashMap(endOffsets);
- this.assignments = Sets.newHashSet(startOffsets.keySet());
+ this.endOffsets = new HashMap<>(endOffsets);
+ this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
}
@@ -2097,7 +2103,13 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
@JsonProperty
public boolean isCheckpointed()
{
- return checkpointed;
+ lock.lock();
+ try {
+ return checkpointed;
+ }
+ finally {
+ lock.unlock();
+ }
}
@JsonProperty
@@ -2115,7 +2127,13 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
@JsonProperty
public Map<Integer, Long> getEndOffsets()
{
- return endOffsets;
+ lock.lock();
+ try {
+ return endOffsets;
+ }
+ finally {
+ lock.unlock();
+ }
}
@JsonProperty
@@ -2126,19 +2144,31 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
public void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
- endOffsets.putAll(newEndOffsets);
- checkpointed = true;
+ lock.lock();
+ try {
+ endOffsets.putAll(newEndOffsets);
+ checkpointed = true;
+ }
+ finally {
+ lock.unlock();
+ }
}
public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
- assignments.clear();
- nextPartitionOffset.entrySet().forEach(partitionOffset -> {
- if (Longs.compare(endOffsets.get(partitionOffset.getKey()),
nextPartitionOffset.get(partitionOffset.getKey()))
- > 0) {
- assignments.add(partitionOffset.getKey());
- }
- });
+ lock.lock();
+ try {
+ assignments.clear();
+ nextPartitionOffset.entrySet().forEach(partitionOffset -> {
+ if (Longs.compare(endOffsets.get(partitionOffset.getKey()),
nextPartitionOffset.get(partitionOffset.getKey()))
+ > 0) {
+ assignments.add(partitionOffset.getKey());
+ }
+ });
+ }
+ finally {
+ lock.unlock();
+ }
}
public boolean isOpen()
@@ -2148,10 +2178,16 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
boolean canHandle(ConsumerRecord<byte[], byte[]> record)
{
- return isOpen()
- && endOffsets.get(record.partition()) != null
- && record.offset() >= startOffsets.get(record.partition())
- && record.offset() < endOffsets.get(record.partition());
+ lock.lock();
+ try {
+ return isOpen()
+ && endOffsets.get(record.partition()) != null
+ && record.offset() >= startOffsets.get(record.partition())
+ && record.offset() < endOffsets.get(record.partition());
+ }
+ finally {
+ lock.unlock();
+ }
}
private SequenceMetadata()
@@ -2173,15 +2209,21 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
@Override
public String toString()
{
- return "SequenceMetadata{" +
- "sequenceName='" + sequenceName + '\'' +
- ", sequenceId=" + sequenceId +
- ", startOffsets=" + startOffsets +
- ", endOffsets=" + endOffsets +
- ", assignments=" + assignments +
- ", sentinel=" + sentinel +
- ", checkpointed=" + checkpointed +
- '}';
+ lock.lock();
+ try {
+ return "SequenceMetadata{" +
+ "sequenceName='" + sequenceName + '\'' +
+ ", sequenceId=" + sequenceId +
+ ", startOffsets=" + startOffsets +
+ ", endOffsets=" + endOffsets +
+ ", assignments=" + assignments +
+ ", sentinel=" + sentinel +
+ ", checkpointed=" + checkpointed +
+ '}';
+ }
+ finally {
+ lock.unlock();
+ }
}
@@ -2194,28 +2236,40 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
@Override
public Object getMetadata()
{
- Preconditions.checkState(
- assignments.isEmpty(),
- "This committer can be used only once all the records till
offsets [%s] have been consumed, also make sure to call updateAssignments
before using this committer",
- endOffsets
- );
+ lock.lock();
- // merge endOffsets for this sequence with globally
lastPersistedOffsets
- // This is done because this committer would be persisting only
sub set of segments
- // corresponding to the current sequence. Generally,
lastPersistedOffsets should already
- // cover endOffsets but just to be sure take max of offsets and
persist that
- for (Map.Entry<Integer, Long> partitionOffset :
endOffsets.entrySet()) {
- lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
- partitionOffset.getValue(),
-
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
- ));
- }
+ try {
+ Preconditions.checkState(
+ assignments.isEmpty(),
+ "This committer can be used only once all the records till
offsets [%s] have been consumed, also make"
+ + " sure to call updateAssignments before using this
committer",
+ endOffsets
+ );
- // Publish metadata can be different from persist metadata as we
are going to publish only
- // subset of segments
- return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new
KafkaPartitions(topic, lastPersistedOffsets),
- METADATA_PUBLISH_PARTITIONS, new
KafkaPartitions(topic, endOffsets)
- );
+ // merge endOffsets for this sequence with globally
lastPersistedOffsets
+ // This is done because this committer would be persisting
only sub set of segments
+ // corresponding to the current sequence. Generally,
lastPersistedOffsets should already
+ // cover endOffsets but just to be sure take max of offsets
and persist that
+ for (Map.Entry<Integer, Long> partitionOffset :
endOffsets.entrySet()) {
+ lastPersistedOffsets.put(
+ partitionOffset.getKey(),
+ Math.max(
+ partitionOffset.getValue(),
+
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
+ )
+ );
+ }
+
+ // Publish metadata can be different from persist metadata as
we are going to publish only
+ // subset of segments
+ return ImmutableMap.of(
+ METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic,
lastPersistedOffsets),
+ METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic,
endOffsets)
+ );
+ }
+ finally {
+ lock.unlock();
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]