This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccd79d62b7  Fix NPE caused by realtime segment closing race, fix 
possible missing-segment retry bug. (#15260)
5ccd79d62b7 is described below

commit 5ccd79d62b7efa0211ff5408baca979d3dda2594
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 20 10:48:14 2023 -0800

     Fix NPE caused by realtime segment closing race, fix possible 
missing-segment retry bug. (#15260)
    
    * Fix NPE caused by realtime segment closing race, fix possible 
missing-segment retry bug.
    
    Fixes #12168, by returning empty from FireHydrant when the segment is
    swapped to null. This causes the SinkQuerySegmentWalker to use
    ReportTimelineMissingSegmentQueryRunner, which causes the Broker to look
    for the segment somewhere else.
    
    In addition, this patch changes SinkQuerySegmentWalker to acquire references
    to all hydrants (subsegments of a sink) at once, and return a
    ReportTimelineMissingSegmentQueryRunner if *any* of them could not be 
acquired.
    I suspect, although have not confirmed, that the prior behavior could lead 
to
    segments being reported as missing even though results from some hydrants 
were
    still included.
    
    * Some more test coverage.
---
 .../apache/druid/segment/realtime/FireHydrant.java |   9 +-
 .../appenderator/SinkQuerySegmentWalker.java       | 128 ++++++++++-----------
 .../druid/segment/realtime/plumber/Sink.java       |  82 ++++++++++++-
 .../realtime/plumber/SinkSegmentReference.java     |  78 +++++++++++++
 .../druid/segment/realtime/FireHydrantTest.java    |  16 +++
 .../druid/segment/realtime/plumber/SinkTest.java   |  79 +++++++++++++
 6 files changed, 325 insertions(+), 67 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java 
b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
index 29a8986a04c..5f1a88f2ea9 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
@@ -175,6 +175,12 @@ public class FireHydrant
   )
   {
     ReferenceCountingSegment sinkSegment = adapter.get();
+
+    if (sinkSegment == null) {
+      // adapter can be null if this segment is removed (swapped to null) 
while being queried.
+      return Optional.empty();
+    }
+
     SegmentReference segment = segmentMapFn.apply(sinkSegment);
     while (true) {
       Optional<Closeable> reference = segment.acquireReferences();
@@ -186,7 +192,8 @@ public class FireHydrant
       // segment swap, the new segment should already be visible.
       ReferenceCountingSegment newSinkSegment = adapter.get();
       if (newSinkSegment == null) {
-        throw new ISE("FireHydrant was 'closed' by swapping segment to null 
while acquiring a segment");
+        // adapter can be null if this segment is removed (swapped to null) 
while being queried.
+        return Optional.empty();
       }
       if (sinkSegment == newSinkSegment) {
         if (newSinkSegment.isClosed()) {
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 81cd4db2556..7c81b60feab 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -62,6 +62,7 @@ import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
@@ -69,6 +70,7 @@ import org.apache.druid.utils.CloseableUtils;
 import org.joda.time.Interval;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -169,17 +171,17 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
 
     // Make sure this query type can handle the subquery, if present.
-    if ((dataSourceFromQuery instanceof QueryDataSource) && 
!toolChest.canPerformSubquery(((QueryDataSource) 
dataSourceFromQuery).getQuery())) {
+    if ((dataSourceFromQuery instanceof QueryDataSource)
+        && !toolChest.canPerformSubquery(((QueryDataSource) 
dataSourceFromQuery).getQuery())) {
       throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
     }
 
     // segmentMapFn maps each base Segment into a joined Segment if necessary.
-    final Function<SegmentReference, SegmentReference> segmentMapFn = 
dataSourceFromQuery
-                                                                        
.createSegmentMapFunction(
-                                                                            
query,
-                                                                            
cpuTimeAccumulator
-                                                                        );
-
+    final Function<SegmentReference, SegmentReference> segmentMapFn =
+        dataSourceFromQuery.createSegmentMapFunction(
+            query,
+            cpuTimeAccumulator
+        );
 
     // We compute the join cache key here itself so it doesn't need to be 
re-computed for every segment
     final Optional<byte[]> cacheKeyPrefix = 
Optional.ofNullable(query.getDataSource().getCacheKey());
@@ -200,44 +202,34 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
 
           final Sink theSink = chunk.getObject();
           final SegmentId sinkSegmentId = theSink.getSegment().getId();
+          final List<SinkSegmentReference> segmentReferences =
+              theSink.acquireSegmentReferences(segmentMapFn, 
skipIncrementalSegment);
 
-          Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
-              Iterables.transform(
-                  theSink,
-                  hydrant -> {
-                    // Hydrant might swap at any point, but if it's swapped at 
the start
-                    // then we know it's *definitely* swapped.
-                    final boolean hydrantDefinitelySwapped = 
hydrant.hasSwapped();
-
-                    if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
-                      return new Pair<>(hydrant.getSegmentDataInterval(), new 
NoopQueryRunner<>());
-                    }
-
-                    // Prevent the underlying segment from swapping when its 
being iterated
-                    final Optional<Pair<SegmentReference, Closeable>> 
maybeSegmentAndCloseable =
-                        hydrant.getSegmentForQuery(segmentMapFn);
+          if (segmentReferences == null) {
+            // We failed to acquire references for all subsegments. Bail and 
report the entire sink missing.
+            return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+          } else if (segmentReferences.isEmpty()) {
+            return new NoopQueryRunner<>();
+          }
 
-                    // if optional isn't present, we failed to acquire 
reference to the segment or any joinables
-                    if (!maybeSegmentAndCloseable.isPresent()) {
-                      return new Pair<>(
-                          hydrant.getSegmentDataInterval(),
-                          new 
ReportTimelineMissingSegmentQueryRunner<>(descriptor)
-                      );
-                    }
-                    final Pair<SegmentReference, Closeable> 
segmentAndCloseable = maybeSegmentAndCloseable.get();
-                    try {
+          final Closeable releaser = () -> 
CloseableUtils.closeAll(segmentReferences);
 
-                      QueryRunner<T> runner = 
factory.createRunner(segmentAndCloseable.lhs);
+          try {
+            Iterable<QueryRunner<T>> perHydrantRunners = new 
SinkQueryRunners<>(
+                Iterables.transform(
+                    segmentReferences,
+                    segmentReference -> {
+                      QueryRunner<T> runner = 
factory.createRunner(segmentReference.getSegment());
 
                       // 1) Only use caching if data is immutable
                       // 2) Hydrants are not the same between replicas, make 
sure cache is local
-                      if (hydrantDefinitelySwapped && cache.isLocal()) {
-                        StorageAdapter storageAdapter = 
segmentAndCloseable.lhs.asStorageAdapter();
+                      if (segmentReference.isImmutable() && cache.isLocal()) {
+                        StorageAdapter storageAdapter = 
segmentReference.getSegment().asStorageAdapter();
                         long segmentMinTime = 
storageAdapter.getMinTime().getMillis();
                         long segmentMaxTime = 
storageAdapter.getMaxTime().getMillis();
                         Interval actualDataInterval = 
Intervals.utc(segmentMinTime, segmentMaxTime + 1);
                         runner = new CachingQueryRunner<>(
-                            makeHydrantCacheIdentifier(hydrant),
+                            makeHydrantCacheIdentifier(sinkSegmentId, 
segmentReference.getHydrantNumber()),
                             cacheKeyPrefix,
                             descriptor,
                             actualDataInterval,
@@ -254,35 +246,33 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
                             cacheConfig
                         );
                       }
-                      // Make it always use Closeable to decrement()
-                      runner = QueryRunnerHelper.makeClosingQueryRunner(
-                          runner,
-                          segmentAndCloseable.rhs
-                      );
-                      return new 
Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
-                    }
-                    catch (Throwable e) {
-                      throw CloseableUtils.closeAndWrapInCatch(e, 
segmentAndCloseable.rhs);
+                      return new 
Pair<>(segmentReference.getSegment().getDataInterval(), runner);
                     }
-                  }
-              )
-          );
-          return new SpecificSegmentQueryRunner<>(
-              withPerSinkMetrics(
-                  new BySegmentQueryRunner<>(
-                      sinkSegmentId,
-                      descriptor.getInterval().getStart(),
-                      factory.mergeRunners(
-                          DirectQueryProcessingPool.INSTANCE,
-                          perHydrantRunners
-                      )
-                  ),
-                  toolChest,
-                  sinkSegmentId,
-                  cpuTimeAccumulator
-              ),
-              new SpecificSegmentSpec(descriptor)
-          );
+                )
+            );
+            return QueryRunnerHelper.makeClosingQueryRunner(
+                new SpecificSegmentQueryRunner<>(
+                    withPerSinkMetrics(
+                        new BySegmentQueryRunner<>(
+                            sinkSegmentId,
+                            descriptor.getInterval().getStart(),
+                            factory.mergeRunners(
+                                DirectQueryProcessingPool.INSTANCE,
+                                perHydrantRunners
+                            )
+                        ),
+                        toolChest,
+                        sinkSegmentId,
+                        cpuTimeAccumulator
+                    ),
+                    new SpecificSegmentSpec(descriptor)
+                ),
+                releaser
+            );
+          }
+          catch (Throwable e) {
+            throw CloseableUtils.closeAndWrapInCatch(e, releaser);
+          }
         }
     );
     final QueryRunner<T> mergedRunner =
@@ -361,8 +351,16 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     return sinkTimeline;
   }
 
-  public static String makeHydrantCacheIdentifier(FireHydrant input)
+  public static String makeHydrantCacheIdentifier(final FireHydrant hydrant)
+  {
+    return makeHydrantCacheIdentifier(hydrant.getSegmentId(), 
hydrant.getCount());
+  }
+
+  public static String makeHydrantCacheIdentifier(final SegmentId segmentId, 
final int hydrantNumber)
   {
-    return input.getSegmentId() + "_" + input.getCount();
+    // Cache ID like segmentId_H0, etc. The 'H' disambiguates subsegment 
[foo_x_y_z partition 0 hydrant 1]
+    // from full segment [foo_x_y_z partition 1], and is therefore useful if 
we ever want the cache to mix full segments
+    // with subsegments (hydrants).
+    return segmentId + "_H" + hydrantNumber;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index 43e3f096541..e98a121accf 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
@@ -26,9 +27,13 @@ import com.google.common.collect.Lists;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.Query;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.column.ColumnFormat;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
@@ -40,8 +45,12 @@ import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Overshadowable;
 import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.utils.CloseableUtils;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,14 +59,18 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
 public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
 {
   private static final IncrementalIndexAddResult ALREADY_SWAPPED =
       new IncrementalIndexAddResult(-1, -1, "write after index swapped");
+  private static final Logger log = new Logger(Sink.class);
 
   private final Object hydrantLock = new Object();
   private final Interval interval;
@@ -228,6 +241,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
 
   /**
    * Marks sink as 'finished', preventing further writes.
+   *
    * @return 'true' if sink was sucessfully finished, 'false' if sink was 
already finished
    */
   public boolean finishWriting()
@@ -288,6 +302,22 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
     }
   }
 
+  /**
+   * Acquire references to all {@link FireHydrant} that represent this sink. 
Returns null if they cannot all be
+   * acquired, possibly because they were closed (swapped to null) 
concurrently with this method being called.
+   *
+   * @param segmentMapFn           from {@link 
org.apache.druid.query.DataSource#createSegmentMapFunction(Query, AtomicLong)}
+   * @param skipIncrementalSegment whether in-memory {@link IncrementalIndex} 
segments should be skipped
+   */
+  @Nullable
+  public List<SinkSegmentReference> acquireSegmentReferences(
+      final Function<SegmentReference, SegmentReference> segmentMapFn,
+      final boolean skipIncrementalSegment
+  )
+  {
+    return acquireSegmentReferences(hydrants, segmentMapFn, 
skipIncrementalSegment);
+  }
+
   private boolean checkInDedupSet(InputRow row)
   {
     if (dedupColumn != null) {
@@ -335,7 +365,8 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
         .build();
 
     // Build the incremental-index according to the spec that was chosen by 
the user
-    final IncrementalIndex newIndex = appendableIndexSpec.builder()
+    final IncrementalIndex newIndex = appendableIndexSpec
+        .builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(maxRowsInMemory)
         .setMaxBytesInMemory(maxBytesInMemory)
@@ -452,4 +483,53 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
   {
     return shardSpec.getAtomicUpdateGroupSize();
   }
+
+  /**
+   * Helper for {@link #acquireSegmentReferences(Function, boolean)}. Separate 
method to simplify testing (we test this
+   * method instead of testing {@link #acquireSegmentReferences(Function, 
boolean)} directly).
+   */
+  @VisibleForTesting
+  static List<SinkSegmentReference> acquireSegmentReferences(
+      final List<FireHydrant> hydrants,
+      final Function<SegmentReference, SegmentReference> segmentMapFn,
+      final boolean skipIncrementalSegment
+  )
+  {
+    final List<SinkSegmentReference> retVal = new ArrayList<>(hydrants.size());
+
+    try {
+      for (final FireHydrant hydrant : hydrants) {
+        // Hydrant might swap at any point, but if it's swapped at the start
+        // then we know it's *definitely* swapped.
+        final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
+
+        if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
+          continue;
+        }
+
+        final Optional<Pair<SegmentReference, Closeable>> maybeHolder = 
hydrant.getSegmentForQuery(segmentMapFn);
+        if (maybeHolder.isPresent()) {
+          final Pair<SegmentReference, Closeable> holder = maybeHolder.get();
+          retVal.add(new SinkSegmentReference(hydrant.getCount(), holder.lhs, 
hydrantDefinitelySwapped, holder.rhs));
+        } else {
+          // Cannot acquire this hydrant. Release all others previously 
acquired and return null.
+          for (final SinkSegmentReference reference : retVal) {
+            reference.close();
+          }
+
+          return null;
+        }
+      }
+
+      return retVal;
+    }
+    catch (Throwable e) {
+      // Release all references previously acquired and throw the error.
+      for (final SinkSegmentReference reference : retVal) {
+        CloseableUtils.closeAndSuppressExceptions(reference, e::addSuppressed);
+      }
+
+      throw e;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java
new file mode 100644
index 00000000000..10dfc2b275e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/SinkSegmentReference.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.plumber;
+
+
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.Closeable;
+import java.util.function.Function;
+
+/**
+ * Segment reference returned by {@link 
Sink#acquireSegmentReferences(Function, boolean)}. Must be closed in order
+ * to release the reference.
+ */
+public class SinkSegmentReference implements Closeable
+{
+  private final int hydrantNumber;
+  private final SegmentReference segment;
+  private final boolean immutable;
+  private final Closeable releaser;
+
+  public SinkSegmentReference(int hydrantNumber, SegmentReference segment, 
boolean immutable, Closeable releaser)
+  {
+    this.hydrantNumber = hydrantNumber;
+    this.segment = segment;
+    this.immutable = immutable;
+    this.releaser = releaser;
+  }
+
+  /**
+   * Index of the {@link org.apache.druid.segment.realtime.FireHydrant} within 
the {@link Sink} that this segment
+   * reference came from.
+   */
+  public int getHydrantNumber()
+  {
+    return hydrantNumber;
+  }
+
+  /**
+   * The segment reference.
+   */
+  public SegmentReference getSegment()
+  {
+    return segment;
+  }
+
+  /**
+   * Whether the segment is immutable.
+   */
+  public boolean isImmutable()
+  {
+    return immutable;
+  }
+
+  @Override
+  public void close()
+  {
+    CloseableUtils.closeAndWrapExceptions(releaser);
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java 
b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
index 0085cb12ead..38c3fda1e7e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
@@ -146,6 +146,22 @@ public class FireHydrantTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(0, queryableSegmentReference.getNumReferences());
   }
 
+  @Test
+  public void testGetSegmentForQuerySwappedWithNull()
+  {
+    ReferenceCountingSegment incrementalSegmentReference = 
hydrant.getHydrantSegment();
+    hydrant.swapSegment(null);
+    ReferenceCountingSegment queryableSegmentReference = 
hydrant.getHydrantSegment();
+    Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
+    Assert.assertNull(queryableSegmentReference);
+
+    Optional<Pair<SegmentReference, Closeable>> maybeSegmentAndCloseable = 
hydrant.getSegmentForQuery(
+        Function.identity()
+    );
+    Assert.assertEquals(0, incrementalSegmentReference.getNumReferences());
+    Assert.assertFalse(maybeSegmentAndCloseable.isPresent());
+  }
+
   @Test
   public void testGetSegmentForQueryButNotAbleToAcquireReferences()
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
index 2d3bdc4ae7e..f208435037f 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
@@ -30,23 +30,36 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.RowAdapters;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
+import org.easymock.EasyMock;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.function.Function;
 
 /**
+ *
  */
 public class SinkTest extends InitializedNullHandlingTest
 {
@@ -292,4 +305,70 @@ public class SinkTest extends InitializedNullHandlingTest
     ), false).getRowCount();
     Assert.assertTrue(rows == -2);
   }
+
+  @Test
+  public void testAcquireSegmentReferences_empty()
+  {
+    Assert.assertEquals(
+        Collections.emptyList(),
+        Sink.acquireSegmentReferences(Collections.emptyList(), 
Function.identity(), false)
+    );
+  }
+
+  @Test
+  public void testAcquireSegmentReferences_two() throws IOException
+  {
+    final List<FireHydrant> hydrants = twoHydrants();
+    final List<SinkSegmentReference> references = 
Sink.acquireSegmentReferences(hydrants, Function.identity(), false);
+    Assert.assertNotNull(references);
+    Assert.assertEquals(2, references.size());
+    Assert.assertEquals(0, references.get(0).getHydrantNumber());
+    Assert.assertFalse(references.get(0).isImmutable());
+    Assert.assertEquals(1, references.get(1).getHydrantNumber());
+    Assert.assertTrue(references.get(1).isImmutable());
+    CloseableUtils.closeAll(references);
+  }
+
+  @Test
+  public void testAcquireSegmentReferences_two_skipIncremental() throws 
IOException
+  {
+    final List<FireHydrant> hydrants = twoHydrants();
+    final List<SinkSegmentReference> references = 
Sink.acquireSegmentReferences(hydrants, Function.identity(), true);
+    Assert.assertNotNull(references);
+    Assert.assertEquals(1, references.size());
+    Assert.assertEquals(1, references.get(0).getHydrantNumber());
+    Assert.assertTrue(references.get(0).isImmutable());
+    CloseableUtils.closeAll(references);
+  }
+
+  @Test
+  public void testAcquireSegmentReferences_twoWithOneSwappedToNull()
+  {
+    // One segment has been swapped out. (Happens when sinks are being closed.)
+    final List<FireHydrant> hydrants = twoHydrants();
+    hydrants.get(1).swapSegment(null);
+
+    final List<SinkSegmentReference> references = 
Sink.acquireSegmentReferences(hydrants, Function.identity(), false);
+    Assert.assertNull(references);
+  }
+
+  /**
+   * Generate one in-memory hydrant, one not-in-memory hydrant.
+   */
+  private static List<FireHydrant> twoHydrants()
+  {
+    final SegmentId segmentId = SegmentId.dummy("foo");
+    return Arrays.asList(
+        new FireHydrant(EasyMock.createMock(IncrementalIndex.class), 0, 
segmentId),
+        new FireHydrant(
+            new RowBasedSegment<>(
+                segmentId,
+                Sequences.empty(),
+                RowAdapters.standardRow(),
+                RowSignature.empty()
+            ),
+            1
+        )
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to