github-code-scanning[bot] commented on code in PR #14985:
URL: https://github.com/apache/druid/pull/14985#discussion_r1359403691


##########
server/src/test/java/org/apache/druid/segment/metadata/TestSegmentMetadataQueryWalker.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.client.CoordinatorServerView;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.FunctionalIterable;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BySegmentQueryRunner;
+import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TestSegmentMetadataQueryWalker extends 
SegmentMetadataQuerySegmentWalker
+{
+  private QueryRunnerFactoryConglomerate conglomerate;
+  private Map<SegmentDescriptor, Pair<QueryableIndex, DataSegment>> 
queryableIndexMap;
+
+  public TestSegmentMetadataQueryWalker(
+      CoordinatorServerView serverView,
+      DruidHttpClientConfig httpClientConfig,
+      QueryToolChestWarehouse warehouse,
+      ServerConfig serverConfig,
+      ServiceEmitter emitter,
+      QueryRunnerFactoryConglomerate conglomerate,
+      Map<SegmentDescriptor, Pair<QueryableIndex, DataSegment>> 
queryableIndexMap
+  )
+  {
+    super(
+        serverView,
+        httpClientConfig,
+        warehouse,
+        serverConfig,
+        emitter
+    );
+    this.conglomerate = conglomerate;
+    this.queryableIndexMap = queryableIndexMap;
+  }
+
+  public void add(DataSegment segment, QueryableIndex index)
+  {
+    queryableIndexMap.put(segment.toDescriptor(), Pair.of(index, segment));
+  }
+
+  @Override
+  <T> Sequence<T> getServerResults(
+      QueryRunner serverRunner,
+      QueryPlus<T> queryPlus,
+      ResponseContext responseContext,
+      long maxQueuedBytesPerServer,
+      List<SegmentDescriptor> segmentDescriptors
+  )
+  {
+    QueryRunnerFactory factory = 
conglomerate.findFactory(queryPlus.getQuery());
+    QueryToolChest toolChest = factory.getToolchest();
+
+    return new FinalizeResultsQueryRunner<>(
+        toolChest.mergeResults(
+            factory.mergeRunners(
+                Execs.directExecutor(),
+                FunctionalIterable
+                    .create(segmentDescriptors)
+                    .transform(
+                        segment ->
+                            new BySegmentQueryRunner<>(
+                                queryableIndexMap.get(segment).rhs.getId(),
+                                
queryableIndexMap.get(segment).rhs.getInterval().getStart(),
+                                factory.createRunner(
+                                    new QueryableIndexSegment(
+                                        queryableIndexMap.get(segment).lhs,
+                                        
queryableIndexMap.get(segment).rhs.getId()))
+                            )
+                    )
+            )

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5902)



##########
sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java:
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.BrokerServerView;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DirectDruidClientFactory;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.FilteredServerInventoryView;
+import org.apache.druid.client.FilteringSegmentCallback;
+import org.apache.druid.client.InternalQueryConfig;
+import org.apache.druid.client.ServerView;
+import org.apache.druid.client.TimelineServerView;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
+import org.apache.druid.client.selector.RandomServerSelectorStrategy;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.MapJoinableFactory;
+import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
+import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class BrokerSegmentMetadataCacheConcurrencyTest extends 
BrokerSegmentMetadataCacheCommon
+{
+  private static final String DATASOURCE = "datasource";
+  static final BrokerSegmentMetadataCacheConfig SEGMENT_CACHE_CONFIG_DEFAULT = 
BrokerSegmentMetadataCacheConfig.create("PT1S");
+  private File tmpDir;
+  private TestServerInventoryView inventoryView;
+  private BrokerServerView serverView;
+  private AbstractSegmentMetadataCache schema;
+  private ExecutorService exec;
+
+  @Before
+  @Override
+  public void setUp() throws Exception
+  {
+    super.setUp();
+    tmpDir = temporaryFolder.newFolder();
+    // walker = new SpecificSegmentsQuerySegmentWalker(conglomerate);
+    inventoryView = new TestServerInventoryView();
+    serverView = newBrokerServerView(inventoryView);
+    inventoryView.init();
+    serverView.awaitInitialization();
+    exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d");
+  }
+
+  @After
+  @Override
+  public void tearDown() throws Exception
+  {
+    super.tearDown();
+    exec.shutdownNow();
+  }
+
+  /**
+   * This tests the contention between three components, {@link 
AbstractSegmentMetadataCache},
+   * {@code InventoryView}, and {@link BrokerServerView}. It first triggers
+   * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done 
with
+   * {@link AbstractSegmentMetadataCache#lock}, {@link 
AbstractSegmentMetadataCache#buildDruidTable}
+   * is overridden to sleep before doing real work. While refreshing
+   * {@code SegmentMetadataCache}, more new segments are added to
+   * {@code InventoryView}, which triggers updates of {@code BrokerServerView}.
+   * Finally, while {@code BrokerServerView} is updated,
+   * {@link BrokerServerView#getTimeline} is continuously called to mimic user 
query
+   * processing. All these calls must return without heavy contention.
+   */
+  @Test(timeout = 30000L)
+  public void 
testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline()
+      throws InterruptedException, ExecutionException, TimeoutException
+  {
+    schema = new BrokerSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        new PhysicalDatasourceMetadataFactory(new 
MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), segmentManager),
+        new NoopCoordinatorClient()
+    )
+    {
+      @Override
+      public RowSignature buildDruidTable(final String dataSource)
+      {
+        doInLock(() -> {
+          try {
+            // Mimic some heavy work done in lock in DruidSchema
+            Thread.sleep(5000);
+          }
+          catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        });
+        return super.buildDruidTable(dataSource);
+      }
+    };
+
+    int numExistingSegments = 100;
+    int numServers = 19;
+    CountDownLatch segmentLoadLatch = new CountDownLatch(numExistingSegments);
+    serverView.registerTimelineCallback(
+        Execs.directExecutor(),
+        new TimelineServerView.TimelineCallback()
+        {
+          @Override
+          public ServerView.CallbackAction timelineInitialized()
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction segmentAdded(DruidServerMetadata 
server, DataSegment segment)
+          {
+            segmentLoadLatch.countDown();
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction segmentRemoved(DataSegment segment)
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction 
serverSegmentRemoved(DruidServerMetadata server, DataSegment segment)
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+        }
+    );
+    addSegmentsToCluster(0, numServers, numExistingSegments);
+    // Wait for all segments to be loaded in BrokerServerView
+    Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS));
+
+    // Trigger refresh of DruidSchema. This will internally run the heavy work
+    // mimicked by the overridden buildDruidTable
+    Future<?> refreshFuture = exec.submit(() -> {
+      schema.refresh(
+          
walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()),
+          Sets.newHashSet(DATASOURCE)
+      );
+      return null;
+    });
+
+    // Trigger updates of BrokerServerView. This should be done asynchronously.
+    addSegmentsToCluster(numExistingSegments, numServers, 50); // add 
completely new segments
+    addReplicasToCluster(1, numServers, 30); // add replicas of the first 30 
segments.
+    // for the first 30 segments, we will still have replicas.
+    // for the other 20 segments, they will be completely removed from the 
cluster.
+    removeSegmentsFromCluster(numServers, 50);
+    Assert.assertFalse(refreshFuture.isDone());
+
+    for (int i = 0; i < 1000; i++) {
+      boolean hasTimeline = exec.submit(
+          () -> serverView.getTimeline((new 
TableDataSource(DATASOURCE)).getAnalysis())
+                          .isPresent()
+      ).get(100, TimeUnit.MILLISECONDS);
+      Assert.assertTrue(hasTimeline);
+      // We want to call getTimeline while BrokerServerView is being updated. 
Sleep might help with timing.
+      Thread.sleep(2);
+    }
+
+    refreshFuture.get(10, TimeUnit.SECONDS);
+  }
+
+  /**
+   * This tests the contention between two methods of {@link 
AbstractSegmentMetadataCache}:
+   * {@link AbstractSegmentMetadataCache#refresh} and
+   * {@link AbstractSegmentMetadataCache#getSegmentMetadataSnapshot()}. It 
first triggers
+   * refreshing {@code SegmentMetadataCache}. To mimic some heavy work done 
with
+   * {@link AbstractSegmentMetadataCache#lock}, {@link 
AbstractSegmentMetadataCache#buildDruidTable}
+   * is overridden to sleep before doing real work. While refreshing
+   * {@code SegmentMetadataCache}, {@code getSegmentMetadataSnapshot()} is 
continuously
+   * called to mimic reading the segments table of SystemSchema. All these 
calls
+   * must return without heavy contention.
+   */
+  @Test(timeout = 30000L)
+  public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata()
+      throws InterruptedException, ExecutionException, TimeoutException
+  {
+    schema = new BrokerSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        new PhysicalDatasourceMetadataFactory(new 
MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), segmentManager),
+        new NoopCoordinatorClient()
+    )
+    {
+      @Override
+      public RowSignature buildDruidTable(final String dataSource)
+      {
+        doInLock(() -> {
+          try {
+            // Mimic some heavy work done in lock in SegmentMetadataCache
+            Thread.sleep(5000);
+          }
+          catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        });
+        return super.buildDruidTable(dataSource);
+      }
+    };
+
+    int numExistingSegments = 100;
+    int numServers = 19;
+    CountDownLatch segmentLoadLatch = new CountDownLatch(numExistingSegments);
+    serverView.registerTimelineCallback(
+        Execs.directExecutor(),
+        new TimelineServerView.TimelineCallback()
+        {
+          @Override
+          public ServerView.CallbackAction timelineInitialized()
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction segmentAdded(DruidServerMetadata 
server, DataSegment segment)
+          {
+            segmentLoadLatch.countDown();
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction segmentRemoved(DataSegment segment)
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction 
serverSegmentRemoved(DruidServerMetadata server, DataSegment segment)
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+        }
+    );
+    addSegmentsToCluster(0, numServers, numExistingSegments);
+    // Wait for all segments to be loaded in BrokerServerView
+    Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS));
+
+    // Trigger refresh of SegmentMetadataCache. This will internally run the 
heavy work mimicked
+    // by the overridden buildDruidTable
+    Future<?> refreshFuture = exec.submit(() -> {
+      schema.refresh(
+          
walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()),
+          Sets.newHashSet(DATASOURCE)
+      );
+      return null;
+    });
+    Assert.assertFalse(refreshFuture.isDone());
+
+    for (int i = 0; i < 1000; i++) {
+      Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = exec.submit(
+          () -> schema.getSegmentMetadataSnapshot()
+      ).get(100, TimeUnit.MILLISECONDS);
+      Assert.assertFalse(segmentsMetadata.isEmpty());
+      // We want to call getTimeline while refreshing. Sleep might help with 
timing.
+      Thread.sleep(2);
+    }
+
+    refreshFuture.get(10, TimeUnit.SECONDS);
+  }
+
+  private void addSegmentsToCluster(int partitionIdStart, int numServers, int 
numSegments)
+  {
+    for (int i = 0; i < numSegments; i++) {
+      DataSegment segment = newSegment(i + partitionIdStart);
+      QueryableIndex index = newQueryableIndex(i + partitionIdStart);

Review Comment:
   ## Unread local variable
   
   Variable 'QueryableIndex index' is never read.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5901)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to