surekhasaharan commented on a change in pull request #7653: Refactor 
SQLMetadataSegmentManager; Change contract of REST methods in 
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r287892431
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
 ##########
 @@ -197,366 +340,511 @@ private Runnable createPollTaskForStartOrder(long 
startOrder)
   }
 
   @Override
-  @LifecycleStop
-  public void stop()
+  public boolean isPollingDatabasePeriodically()
   {
-    ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+    // isPollingDatabasePeriodically() is synchronized together with 
startPollingDatabasePeriodically(),
+    // stopPollingDatabasePeriodically() and poll() to ensure that the latest 
currentStartPollingOrder is always
+    // visible. readLock should be used to avoid unexpected performance 
degradation of DruidCoordinator.
+    ReentrantReadWriteLock.ReadLock lock = startStopPollLock.readLock();
     lock.lock();
     try {
-      if (!isStarted()) {
+      return currentStartPollingOrder >= 0;
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void stopPollingDatabasePeriodically()
+  {
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
+    try {
+      if (!isPollingDatabasePeriodically()) {
         return;
       }
 
-      dataSources = null;
-      currentStartOrder = -1;
-      exec.shutdownNow();
-      exec = null;
+      periodicPollTaskFuture.cancel(false);
+      latestDatabasePoll = null;
+
+      // NOT nulling dataSources, allowing to query the latest polled data 
even when this SegmentsMetadata object is
+      // stopped.
+
+      currentStartPollingOrder = -1;
     }
     finally {
       lock.unlock();
     }
   }
 
-  private Pair<DataSegment, Boolean> usedPayloadMapper(
-      final int index,
-      final ResultSet resultSet,
-      final StatementContext context
-  ) throws SQLException
+  private void awaitOrPerformDatabasePoll()
   {
+    // Double-checked locking with awaitLatestDatabasePoll() call playing the 
role of the "check".
+    if (awaitLatestDatabasePoll()) {
+      return;
+    }
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
     try {
-      return new Pair<>(
-          jsonMapper.readValue(resultSet.getBytes("payload"), 
DataSegment.class),
-          resultSet.getBoolean("used")
-      );
+      if (awaitLatestDatabasePoll()) {
+        return;
+      }
+      OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+      this.latestDatabasePoll = newOnDemandUpdate;
+      doOnDemandPoll(newOnDemandUpdate);
     }
-    catch (IOException e) {
-      throw new RuntimeException(e);
+    finally {
+      lock.unlock();
     }
   }
 
   /**
-   * Gets a list of all datasegments that overlap the provided interval along 
with thier used status.
+   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or 
an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from now, awaits for it 
and returns true; returns false otherwise,
+   * meaning that a new on-demand database poll should be initiated.
    */
-  private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
-      final String dataSource,
-      final Interval interval
-  )
+  private boolean awaitLatestDatabasePoll()
   {
-    return connector.inReadOnlyTransaction(
-        (handle, status) -> handle.createQuery(
-            StringUtils.format(
-                "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource 
AND start < :end AND %2$send%2$s > :start",
-                getSegmentsTable(),
-                connector.getQuoteString()
-            )
-        )
-        .setFetchSize(connector.getStreamingFetchSize())
-        .bind("dataSource", dataSource)
-        .bind("start", interval.getStart().toString())
-        .bind("end", interval.getEnd().toString())
-        .map(this::usedPayloadMapper)
-        .list()
-    );
+    DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+    if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
+      Futures.getUnchecked(((PeriodicDatabasePoll) 
latestDatabasePoll).firstPollCompletionFuture);
+      return true;
+    }
+    if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+      long periodicPollDelayNanos = 
TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+      OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) 
latestDatabasePoll;
+      boolean latestUpdateIsFresh = 
latestOnDemandPoll.nanosElapsedFromInitiation() < periodicPollDelayNanos;
+      if (latestUpdateIsFresh) {
+        Futures.getUnchecked(latestOnDemandPoll.pollCompletionFuture);
+        return true;
+      }
+      // Latest on-demand update is not fresh. Fall through to return false 
from this method.
+    } else {
+      assert latestDatabasePoll == null;
+      // No periodic updates and no on-demand database poll have been done 
yet, nothing to await for.
+    }
+    return false;
   }
 
-  private List<Pair<DataSegment, Boolean>> getDataSegments(
-      final String dataSource,
-      final Collection<String> segmentIds,
-      final Handle handle
-  )
+  private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
   {
-    return segmentIds.stream().map(
-        segmentId -> Optional.ofNullable(
-            handle.createQuery(
-                StringUtils.format(
-                    "SELECT used, payload FROM %1$s WHERE dataSource = 
:dataSource AND id = :id",
-                    getSegmentsTable()
-                )
-            )
-            .bind("dataSource", dataSource)
-            .bind("id", segmentId)
-            .map(this::usedPayloadMapper)
-            .first()
-        )
-        .orElseThrow(() -> new 
UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", 
segmentId)))
-    )
-    .collect(Collectors.toList());
-  }
-
-  /**
-   * Builds a VersionedIntervalTimeline containing used segments that overlap 
the intervals passed.
-   */
-  private VersionedIntervalTimeline<String, DataSegment> 
buildVersionedIntervalTimeline(
-      final String dataSource,
-      final Collection<Interval> intervals,
-      final Handle handle
-  )
-  {
-    return VersionedIntervalTimeline.forSegments(intervals
-        .stream()
-        .flatMap(interval -> handle.createQuery(
-                StringUtils.format(
-                    "SELECT payload FROM %1$s WHERE dataSource = :dataSource 
AND start < :end AND %2$send%2$s > :start AND used = true",
-                    getSegmentsTable(),
-                    connector.getQuoteString()
-                )
-            )
-            .setFetchSize(connector.getStreamingFetchSize())
-            .bind("dataSource", dataSource)
-            .bind("start", interval.getStart().toString())
-            .bind("end", interval.getEnd().toString())
-            .map((i, resultSet, context) -> {
-              try {
-                return jsonMapper.readValue(resultSet.getBytes("payload"), 
DataSegment.class);
-              }
-              catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            })
-            .list()
-            .stream()
-        )
-        .iterator()
-    );
+    try {
+      poll();
+      onDemandPoll.pollCompletionFuture.complete(null);
+    }
+    catch (Throwable t) {
+      onDemandPoll.pollCompletionFuture.completeExceptionally(t);
+      throw t;
+    }
   }
 
   @Override
-  public boolean enableDataSource(final String dataSource)
+  public boolean markSegmentAsUsed(final String segmentId)
   {
     try {
-      return enableSegments(dataSource, Intervals.ETERNITY) != 0;
+      int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
+          (Handle handle) -> handle
+              .createStatement(StringUtils.format("UPDATE %s SET used=true 
WHERE id = :id", getSegmentsTable()))
+              .bind("id", segmentId)
+              .execute()
+      );
+      // Unlike bulk markAsUsed methods: 
markAsUsedAllNonOvershadowedSegmentsInDataSource(),
+      // markAsUsedNonOvershadowedSegmentsInInterval(), and 
markAsUsedNonOvershadowedSegments() we don't put the marked
+      // segment into the respective data source, because we don't have it 
fetched from the database. It's probably not
+      // worth complicating the implementation and making two database queries 
just to add the segment because it will
+      // be anyway fetched during the next poll(). Segment putting that is 
done in the bulk markAsUsed methods is a nice
+      // to have thing, but doesn't formally affects the external guarantees 
of SegmentsMetadata class.
+      return numUpdatedDatabaseEntries > 0;
     }
-    catch (Exception e) {
-      log.error(e, "Exception enabling datasource %s", dataSource);
-      return false;
+    catch (RuntimeException e) {
+      log.error(e, "Exception marking segment %s as used", segmentId);
+      throw e;
     }
   }
 
   @Override
-  public int enableSegments(final String dataSource, final Interval interval)
+  public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String 
dataSource)
   {
-    List<Pair<DataSegment, Boolean>> segments = 
getDataSegmentsOverlappingInterval(dataSource, interval);
-    List<DataSegment> segmentsToEnable = segments.stream()
-        .filter(segment -> !segment.rhs && 
interval.contains(segment.lhs.getInterval()))
-        .map(segment -> segment.lhs)
-        .collect(Collectors.toList());
-
-    VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = 
VersionedIntervalTimeline.forSegments(
-        segments.stream().filter(segment -> segment.rhs).map(segment -> 
segment.lhs).iterator()
-    );
-    VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, 
segmentsToEnable.iterator());
-
-    return enableSegments(
-        segmentsToEnable,
-        versionedIntervalTimeline
-    );
+    return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
   }
 
   @Override
-  public int enableSegments(final String dataSource, final Collection<String> 
segmentIds)
+  public int markAsUsedNonOvershadowedSegmentsInInterval(final String 
dataSource, final Interval interval)
   {
-    Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>> 
data = connector.inReadOnlyTransaction(
-        (handle, status) -> {
-          List<DataSegment> segments = getDataSegments(dataSource, segmentIds, 
handle)
-              .stream()
-              .filter(pair -> !pair.rhs)
-              .map(pair -> pair.lhs)
-              .collect(Collectors.toList());
-
-          VersionedIntervalTimeline<String, DataSegment> 
versionedIntervalTimeline = buildVersionedIntervalTimeline(
-              dataSource,
-              JodaUtils.condenseIntervals(segments.stream().map(segment -> 
segment.getInterval()).collect(Collectors.toList())),
-              handle
-          );
-          VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, 
segments.iterator());
+    Preconditions.checkNotNull(interval);
+    return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
+  }
 
-          return new Pair<>(
-              segments,
-              versionedIntervalTimeline
-          );
+  /**
+   * Implementation for both {@link 
#markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is 
null)
+   * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
+   */
+  private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, 
@Nullable Interval interval)
+  {
+    List<DataSegment> usedSegmentsOverlappingInterval = new ArrayList<>();
+    List<DataSegment> unusedSegmentsInInterval = new ArrayList<>();
+    connector.inReadOnlyTransaction(
+        (handle, status) -> {
+          String queryString =
+              StringUtils.format("SELECT used, payload FROM %1$s WHERE 
dataSource = :dataSource", getSegmentsTable());
+          if (interval != null) {
+            queryString += StringUtils.format(" AND start < :end AND 
%1$send%1$s > :start", connector.getQuoteString());
+          }
+          Query<?> query = handle
+              .createQuery(queryString)
+              .setFetchSize(connector.getStreamingFetchSize())
+              .bind("dataSource", dataSourceName);
+          if (interval != null) {
+            query = query
+                .bind("start", interval.getStart().toString())
+                .bind("end", interval.getEnd().toString());
+          }
+          query = query
+              .map((int index, ResultSet resultSet, StatementContext context) 
-> {
+                try {
+                  DataSegment segment = 
jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
+                  if (resultSet.getBoolean("used")) {
+                    usedSegmentsOverlappingInterval.add(segment);
+                  } else {
+                    if (interval == null || 
interval.contains(segment.getInterval())) {
+                      unusedSegmentsInInterval.add(segment);
+                    }
+                  }
+                  return null;
+                }
+                catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+          // Consume the query results to ensure 
usedSegmentsOverlappingInterval and unusedSegmentsInInterval are
+          // populated.
+          consume(query.iterator());
+          return null;
         }
     );
 
-    return enableSegments(
-        data.lhs,
-        data.rhs
+    VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = 
VersionedIntervalTimeline.forSegments(
+        Iterators.concat(usedSegmentsOverlappingInterval.iterator(), 
unusedSegmentsInInterval.iterator())
     );
+
+    return markNonOvershadowedSegmentsAsUsed(dataSourceName, 
unusedSegmentsInInterval, versionedIntervalTimeline);
   }
 
-  private int enableSegments(
-      final Collection<DataSegment> segments,
-      final VersionedIntervalTimeline<String, DataSegment> 
versionedIntervalTimeline
+  private static void consume(Iterator<?> iterator)
+  {
+    while (iterator.hasNext()) {
+      iterator.next();
+    }
+  }
+
+  /** Also puts non-overshadowed segments into {@link #dataSources}. */
+  private int markNonOvershadowedSegmentsAsUsed(
+      String dataSourceName,
+      List<DataSegment> unusedSegments,
+      VersionedIntervalTimeline<String, DataSegment> timeline
   )
   {
-    if (segments.isEmpty()) {
-      log.warn("No segments found to update!");
-      return 0;
+    @Nullable
+    DruidDataSource dataSource = null;
+    if (dataSources != null) {
+      dataSource = dataSources.computeIfAbsent(
+          dataSourceName,
+          dsName -> new DruidDataSource(dsName, 
createDefaultDataSourceProperties())
+      );
+    }
+    List<String> segmentIdsToMarkAsUsed = new ArrayList<>();
+    for (DataSegment segment : unusedSegments) {
+      if (timeline.isOvershadowed(segment.getInterval(), 
segment.getVersion())) {
+        continue;
+      }
+      if (dataSource != null) {
+        dataSource.addSegment(segment);
+      }
+      String s = segment.getId().toString();
+      segmentIdsToMarkAsUsed.add(s);
 
 Review comment:
   nit : rename s to segmentId or get rid of `s` and do it inline 
`segmentIdsToMarkAsUsed.add(segment.getId().toString())`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to