gianm commented on a change in pull request #7653: Refactor 
SQLMetadataSegmentManager; Change contract of REST methods in 
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r286416635
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
 ##########
 @@ -197,366 +320,489 @@ private Runnable createPollTaskForStartOrder(long 
startOrder)
   }
 
   @Override
-  @LifecycleStop
-  public void stop()
+  public void stopPollingDatabasePeriodically()
   {
-    ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
     lock.lock();
     try {
-      if (!isStarted()) {
+      if (!isPollingDatabasePeriodically()) {
         return;
       }
 
-      dataSources = null;
-      currentStartOrder = -1;
-      exec.shutdownNow();
-      exec = null;
+      periodicPollTaskFuture.cancel(false);
+      latestDatabasePoll = null;
+
+      // NOT nulling dataSources, allowing to query the latest polled data 
even when this SegmentsMetadata object is
+      // stopped.
+
+      currentStartPollingOrder = -1;
     }
     finally {
       lock.unlock();
     }
   }
 
-  private Pair<DataSegment, Boolean> usedPayloadMapper(
-      final int index,
-      final ResultSet resultSet,
-      final StatementContext context
-  ) throws SQLException
+  private void awaitOrPerformDatabasePoll()
   {
+    // Double-checked locking with awaitPeriodicOrFreshOnDemandDatabasePoll() 
call playing the role of the "check".
+    if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+      return;
+    }
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
     try {
-      return new Pair<>(
-          jsonMapper.readValue(resultSet.getBytes("payload"), 
DataSegment.class),
-          resultSet.getBoolean("used")
-      );
+      if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+        return;
+      }
+      OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+      this.latestDatabasePoll = newOnDemandUpdate;
+      doOnDemandPoll(newOnDemandUpdate);
     }
-    catch (IOException e) {
-      throw new RuntimeException(e);
+    finally {
+      lock.unlock();
     }
   }
 
-  /**
-   * Gets a list of all datasegments that overlap the provided interval along 
with thier used status.
-   */
-  private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
-      final String dataSource,
-      final Interval interval
-  )
-  {
-    return connector.inReadOnlyTransaction(
-        (handle, status) -> handle.createQuery(
-            StringUtils.format(
-                "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource 
AND start < :end AND %2$send%2$s > :start",
-                getSegmentsTable(),
-                connector.getQuoteString()
-            )
-        )
-        .setFetchSize(connector.getStreamingFetchSize())
-        .bind("dataSource", dataSource)
-        .bind("start", interval.getStart().toString())
-        .bind("end", interval.getEnd().toString())
-        .map(this::usedPayloadMapper)
-        .list()
-    );
-  }
-
-  private List<Pair<DataSegment, Boolean>> getDataSegments(
-      final String dataSource,
-      final Collection<String> segmentIds,
-      final Handle handle
-  )
+  private boolean awaitPeriodicOrFreshOnDemandDatabasePoll()
   {
-    return segmentIds.stream().map(
-        segmentId -> Optional.ofNullable(
-            handle.createQuery(
-                StringUtils.format(
-                    "SELECT used, payload FROM %1$s WHERE dataSource = 
:dataSource AND id = :id",
-                    getSegmentsTable()
-                )
-            )
-            .bind("dataSource", dataSource)
-            .bind("id", segmentId)
-            .map(this::usedPayloadMapper)
-            .first()
-        )
-        .orElseThrow(() -> new 
UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", 
segmentId)))
-    )
-    .collect(Collectors.toList());
+    DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+    if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
+      Futures.getUnchecked(((PeriodicDatabasePoll) 
latestDatabasePoll).firstPollCompletionFuture);
+      return true;
+    }
+    if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+      long periodicPollDelayNanos = 
TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+      OnDemandDatabasePoll latestOnDemandUpdate = (OnDemandDatabasePoll) 
latestDatabasePoll;
+      boolean latestUpdateIsFresh = 
latestOnDemandUpdate.nanosElapsedFromInitiation() < periodicPollDelayNanos;
+      if (latestUpdateIsFresh) {
+        Futures.getUnchecked(latestOnDemandUpdate.pollCompletionFuture);
+        return true;
+      }
+      // Latest on-demand update is not fresh. Fall through to return false 
from this method.
+    } else {
+      assert latestDatabasePoll == null;
+    }
+    return false;
   }
 
-  /**
-   * Builds a VersionedIntervalTimeline containing used segments that overlap 
the intervals passed.
-   */
-  private VersionedIntervalTimeline<String, DataSegment> 
buildVersionedIntervalTimeline(
-      final String dataSource,
-      final Collection<Interval> intervals,
-      final Handle handle
-  )
+  private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
   {
-    return VersionedIntervalTimeline.forSegments(intervals
-        .stream()
-        .flatMap(interval -> handle.createQuery(
-                StringUtils.format(
-                    "SELECT payload FROM %1$s WHERE dataSource = :dataSource 
AND start < :end AND %2$send%2$s > :start AND used = true",
-                    getSegmentsTable(),
-                    connector.getQuoteString()
-                )
-            )
-            .setFetchSize(connector.getStreamingFetchSize())
-            .bind("dataSource", dataSource)
-            .bind("start", interval.getStart().toString())
-            .bind("end", interval.getEnd().toString())
-            .map((i, resultSet, context) -> {
-              try {
-                return jsonMapper.readValue(resultSet.getBytes("payload"), 
DataSegment.class);
-              }
-              catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            })
-            .list()
-            .stream()
-        )
-        .iterator()
-    );
+    try {
+      poll();
+      onDemandPoll.pollCompletionFuture.complete(null);
+    }
+    catch (Throwable t) {
+      onDemandPoll.pollCompletionFuture.completeExceptionally(t);
+      throw t;
+    }
   }
 
   @Override
-  public boolean enableDataSource(final String dataSource)
+  public boolean markSegmentAsUsed(final String segmentId)
   {
     try {
-      return enableSegments(dataSource, Intervals.ETERNITY) != 0;
+      int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
+          (Handle handle) -> handle
+              .createStatement(StringUtils.format("UPDATE %s SET used=true 
WHERE id = :id", getSegmentsTable()))
+              .bind("id", segmentId)
+              .execute()
+      );
+      // Unlike bulk markAsUsed methods: 
markAsUsedAllNonOvershadowedSegmentsInDataSource(),
+      // markAsUsedNonOvershadowedSegmentsInInterval(), and 
markAsUsedNonOvershadowedSegments() we don't put the marked
+      // segment into the respective data source, because we don't have it 
fetched from the database. It's probably not
+      // worth complicating the implementation and making two database queries 
just to add the segment because it will
+      // be anyway fetched during the next poll(). Segment putting that is 
done in the bulk markAsUsed methods is a nice
+      // to have thing, but doesn't formally affects the external guarantees 
of SegmentsMetadata class.
+      return numUpdatedDatabaseEntries > 0;
     }
-    catch (Exception e) {
-      log.error(e, "Exception enabling datasource %s", dataSource);
-      return false;
+    catch (RuntimeException e) {
+      log.error(e, "Exception marking segment %s as used", segmentId);
+      throw e;
     }
   }
 
   @Override
-  public int enableSegments(final String dataSource, final Interval interval)
+  public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String 
dataSource)
   {
-    List<Pair<DataSegment, Boolean>> segments = 
getDataSegmentsOverlappingInterval(dataSource, interval);
-    List<DataSegment> segmentsToEnable = segments.stream()
-        .filter(segment -> !segment.rhs && 
interval.contains(segment.lhs.getInterval()))
-        .map(segment -> segment.lhs)
-        .collect(Collectors.toList());
-
-    VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = 
VersionedIntervalTimeline.forSegments(
-        segments.stream().filter(segment -> segment.rhs).map(segment -> 
segment.lhs).iterator()
-    );
-    VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, 
segmentsToEnable.iterator());
-
-    return enableSegments(
-        segmentsToEnable,
-        versionedIntervalTimeline
-    );
+    return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
   }
 
   @Override
-  public int enableSegments(final String dataSource, final Collection<String> 
segmentIds)
+  public int markAsUsedNonOvershadowedSegmentsInInterval(final String 
dataSource, final Interval interval)
   {
-    Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>> 
data = connector.inReadOnlyTransaction(
-        (handle, status) -> {
-          List<DataSegment> segments = getDataSegments(dataSource, segmentIds, 
handle)
-              .stream()
-              .filter(pair -> !pair.rhs)
-              .map(pair -> pair.lhs)
-              .collect(Collectors.toList());
-
-          VersionedIntervalTimeline<String, DataSegment> 
versionedIntervalTimeline = buildVersionedIntervalTimeline(
-              dataSource,
-              JodaUtils.condenseIntervals(segments.stream().map(segment -> 
segment.getInterval()).collect(Collectors.toList())),
-              handle
-          );
-          VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, 
segments.iterator());
+    Preconditions.checkNotNull(interval);
+    return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
+  }
 
-          return new Pair<>(
-              segments,
-              versionedIntervalTimeline
-          );
+  /**
+   * Implementation for both {@link 
#markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is 
null)
+   * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
+   */
+  private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, 
@Nullable Interval interval)
+  {
+    List<DataSegment> usedSegmentsOverlappingInterval = new ArrayList<>();
+    List<DataSegment> unusedSegmentsInInterval = new ArrayList<>();
+    connector.inReadOnlyTransaction(
+        (handle, status) -> {
+          String queryString =
+              StringUtils.format("SELECT used, payload FROM %1$s WHERE 
dataSource = :dataSource", getSegmentsTable());
+          if (interval != null) {
+            queryString += StringUtils.format(" AND start < :end AND 
%1$send%1$s > :start", connector.getQuoteString());
+          }
+          Query<?> query = handle
+              .createQuery(queryString)
+              .setFetchSize(connector.getStreamingFetchSize())
+              .bind("dataSource", dataSourceName);
+          if (interval != null) {
+            query = query
+                .bind("start", interval.getStart().toString())
+                .bind("end", interval.getEnd().toString());
+          }
+          query = query
+              .map((int index, ResultSet resultSet, StatementContext context) 
-> {
+                try {
+                  DataSegment segment = 
jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+                  if (resultSet.getBoolean("used")) {
+                    usedSegmentsOverlappingInterval.add(segment);
+                  } else {
+                    if (interval == null || 
interval.contains(segment.getInterval())) {
+                      unusedSegmentsInInterval.add(segment);
+                    }
+                  }
+                  return null;
+                }
+                catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+          // Consume the query results to ensure 
usedSegmentsOverlappingInterval and unusedSegmentsInInterval are
+          // populated.
+          consume(query.iterator());
+          return null;
         }
     );
 
-    return enableSegments(
-        data.lhs,
-        data.rhs
+    VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = 
VersionedIntervalTimeline.forSegments(
+        Iterators.concat(usedSegmentsOverlappingInterval.iterator(), 
unusedSegmentsInInterval.iterator())
     );
+
+    return markNonOvershadowedSegmentsAsUsed(dataSourceName, 
unusedSegmentsInInterval, versionedIntervalTimeline);
+  }
+
+  private static void consume(Iterator<?> iterator)
+  {
+    while (iterator.hasNext()) {
+      iterator.next();
+    }
   }
 
-  private int enableSegments(
-      final Collection<DataSegment> segments,
-      final VersionedIntervalTimeline<String, DataSegment> 
versionedIntervalTimeline
+  /** Also puts non-overshadowed segments into {@link #dataSources}. */
+  private int markNonOvershadowedSegmentsAsUsed(
+      String dataSourceName,
+      List<DataSegment> unusedSegments,
+      VersionedIntervalTimeline<String, DataSegment> timeline
   )
   {
-    if (segments.isEmpty()) {
-      log.warn("No segments found to update!");
-      return 0;
+    @Nullable
+    DruidDataSource dataSource = null;
+    if (dataSources != null) {
+      dataSource = dataSources.computeIfAbsent(
+          dataSourceName,
+          dsName -> new DruidDataSource(dsName, 
createDefaultDataSourceProperties())
+      );
+    }
+    List<String> segmentIdsToMarkAsUsed = new ArrayList<>();
+    for (DataSegment segment : unusedSegments) {
+      if (timeline.isOvershadowed(segment.getInterval(), 
segment.getVersion())) {
+        continue;
+      }
+      if (dataSource != null) {
+        dataSource.addSegment(segment);
+      }
+      String s = segment.getId().toString();
+      segmentIdsToMarkAsUsed.add(s);
     }
 
-    return connector.getDBI().withHandle(handle -> {
-      Batch batch = handle.createBatch();
-      segments
-          .stream()
-          .map(segment -> segment.getId())
-          .filter(segmentId -> !versionedIntervalTimeline.isOvershadowed(
-              segmentId.getInterval(),
-              segmentId.getVersion()
-          ))
-          .forEach(segmentId -> batch.add(
-              StringUtils.format(
-                  "UPDATE %s SET used=true WHERE id = '%s'",
-                  getSegmentsTable(),
-                  segmentId
-              )
-          ));
-      return batch.execute().length;
-    });
+    return markSegmentsAsUsed(segmentIdsToMarkAsUsed);
   }
 
   @Override
-  public boolean enableSegment(final String segmentId)
+  public int markAsUsedNonOvershadowedSegments(final String dataSource, final 
Set<String> segmentIds)
+      throws UnknownSegmentIdException
   {
     try {
-      connector.getDBI().withHandle(
-          new HandleCallback<Void>()
-          {
-            @Override
-            public Void withHandle(Handle handle)
-            {
-              handle.createStatement(StringUtils.format("UPDATE %s SET 
used=true WHERE id = :id", getSegmentsTable()))
-                    .bind("id", segmentId)
-                    .execute();
-              return null;
-            }
-          }
-      );
+      Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>> 
unusedSegmentsAndTimeline = connector
+          .inReadOnlyTransaction(
+              (handle, status) -> {
+                List<DataSegment> unusedSegments = 
retreiveUnusedSegments(dataSource, segmentIds, handle);
+                List<Interval> unusedSegmentsIntervals = 
JodaUtils.condenseIntervals(
+                    
unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+                );
+                Iterator<DataSegment> 
usedSegmentsOverlappingUnusedSegmentsIntervals =
+                    retreiveUsedSegmentsOverlappingIntervals(dataSource, 
unusedSegmentsIntervals, handle);
+                VersionedIntervalTimeline<String, DataSegment> timeline = 
VersionedIntervalTimeline.forSegments(
+                    
Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, 
unusedSegments.iterator())
+                );
+                return new Pair<>(unusedSegments, timeline);
+              }
+          );
+
+      List<DataSegment> unusedSegments = unusedSegmentsAndTimeline.lhs;
+      VersionedIntervalTimeline<String, DataSegment> timeline = 
unusedSegmentsAndTimeline.rhs;
+      return markNonOvershadowedSegmentsAsUsed(dataSource, unusedSegments, 
timeline);
     }
     catch (Exception e) {
-      log.error(e, "Exception enabling segment %s", segmentId);
-      return false;
+      Throwable rootCause = Throwables.getRootCause(e);
+      if (rootCause instanceof UnknownSegmentIdException) {
+        throw (UnknownSegmentIdException) rootCause;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  private List<DataSegment> retreiveUnusedSegments(
+      final String dataSource,
+      final Set<String> segmentIds,
+      final Handle handle
+  ) throws UnknownSegmentIdException
+  {
+    List<String> unknownSegmentIds = new ArrayList<>();
+    List<DataSegment> segments = segmentIds
+        .stream()
+        .map(
+            segmentId -> {
+              Iterator<DataSegment> segmentResultIterator = handle
+                  .createQuery(
+                      StringUtils.format(
+                          "SELECT used, payload FROM %1$s WHERE dataSource = 
:dataSource AND id = :id",
+                          getSegmentsTable()
+                      )
+                  )
+                  .bind("dataSource", dataSource)
+                  .bind("id", segmentId)
+                  .map((int index, ResultSet resultSet, StatementContext 
context) -> {
+                    try {
+                      if (!resultSet.getBoolean("used")) {
+                        return 
jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+                      } else {
+                        // We emit nulls for used segments. They are filtered 
out below in this method.
+                        return null;
+                      }
+                    }
+                    catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                  })
+                  .iterator();
+              if (!segmentResultIterator.hasNext()) {
+                unknownSegmentIds.add(segmentId);
+                return null;
+              } else {
+                @Nullable DataSegment segment = segmentResultIterator.next();
+                if (segmentResultIterator.hasNext()) {
+                  log.error(
+                      "There is more than one row corresponding to segment id 
[%s] in data source [%s] in the database",
+                      segmentId,
+                      dataSource
+                  );
+                }
+                return segment;
+              }
+            }
+        )
+        .filter(Objects::nonNull) // Filter nulls corresponding to used 
segments.
+        .collect(Collectors.toList());
+    if (!unknownSegmentIds.isEmpty()) {
+      throw new UnknownSegmentIdException(unknownSegmentIds);
+    }
+    return segments;
+  }
+
+  private Iterator<DataSegment> retreiveUsedSegmentsOverlappingIntervals(
 
 Review comment:
   retrieve (spelling)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to