This is an automated email from the ASF dual-hosted git repository. jihoonson pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 8ebb7b5 Handoff should ignore segments that are dropped by drop rules (#6676) 8ebb7b5 is described below commit 8ebb7b558b617d8807e6bdc204cabbda9a4ac346 Author: Mingming Qiu <csurj...@gmail.com> AuthorDate: Tue Jan 8 06:43:11 2019 +0800 Handoff should ignore segments that are dropped by drop rules (#6676) * Handoff should ignore segments that are dropped by drop rules * fix travis-ci * fix tests * address comments * remove line added by accident * address comments * add javadoc and logging the full stack trace of exception * add error message --- .../client/coordinator/CoordinatorClient.java | 34 ++- .../CoordinatorBasedSegmentHandoffNotifier.java | 37 +-- .../druid/server/http/DatasourcesResource.java | 89 +++++++ ...CoordinatorBasedSegmentHandoffNotifierTest.java | 225 +--------------- .../druid/server/http/DatasourcesResourceTest.java | 283 ++++++++++++++++++++- 5 files changed, 388 insertions(+), 280 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 4f59d87..ef731b3 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -23,16 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.inject.Inject; -import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.query.SegmentDescriptor; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.joda.time.Interval; - -import java.util.List; public class CoordinatorClient { @@ -49,18 +46,20 @@ public class CoordinatorClient this.druidLeaderClient = druidLeaderClient; } - - public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk) + public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor) { try { FullResponseHolder response = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.GET, - StringUtils.format( - "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", - dataSource, - interval.toString().replace('/', '_'), - incompleteOk - )) + druidLeaderClient.makeRequest( + HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s", + dataSource, + descriptor.getInterval(), + descriptor.getPartitionNumber(), + descriptor.getVersion() + ) + ) ); if (!response.getStatus().equals(HttpResponseStatus.OK)) { @@ -70,12 +69,9 @@ public class CoordinatorClient response.getContent() ); } - return jsonMapper.readValue( - response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>() - { - - } - ); + return jsonMapper.readValue(response.getContent(), new TypeReference<Boolean>() + { + }); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java index 6d06215..028183f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -19,18 +19,13 @@ package org.apache.druid.segment.realtime.plumber; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.server.coordination.DruidServerMetadata; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -95,13 +90,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next(); SegmentDescriptor descriptor = entry.getKey(); try { - List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView( - dataSource, - descriptor.getInterval(), - true - ); - - if (isHandOffComplete(loadedSegments, entry.getKey())) { + if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) { log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor); entry.getValue().lhs.execute(entry.getValue().rhs); itr.remove(); @@ -131,30 +120,6 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot } } - - static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor) - { - for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) { - if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) - && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() - == descriptor.getPartitionNumber() - && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 - && Iterables.any( - segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>() - { - @Override - public boolean apply(DruidServerMetadata input) - { - return input.segmentReplicatable(); - } - } - )) { - return true; - } - } - return false; - } - @Override public void close() { diff --git a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java index 450dbd4..03d3e6a 100644 --- a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java @@ -37,8 +37,13 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataSegmentManager; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordinator.rules.LoadRule; +import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizerMapper; @@ -85,6 +90,7 @@ public class DatasourcesResource private final CoordinatorServerView serverInventoryView; private final MetadataSegmentManager databaseSegmentManager; + private final MetadataRuleManager databaseRuleManager; private final IndexingServiceClient indexingServiceClient; private final AuthConfig authConfig; private final AuthorizerMapper authorizerMapper; @@ -93,6 +99,7 @@ public class DatasourcesResource public DatasourcesResource( CoordinatorServerView serverInventoryView, MetadataSegmentManager databaseSegmentManager, + MetadataRuleManager databaseRuleManager, @Nullable IndexingServiceClient indexingServiceClient, AuthConfig authConfig, AuthorizerMapper authorizerMapper @@ -100,6 +107,7 @@ public class DatasourcesResource { this.serverInventoryView = serverInventoryView; this.databaseSegmentManager = databaseSegmentManager; + this.databaseRuleManager = databaseRuleManager; this.indexingServiceClient = indexingServiceClient; this.authConfig = authConfig; this.authorizerMapper = authorizerMapper; @@ -647,4 +655,85 @@ public class DatasourcesResource ); return Response.ok(retval).build(); } + + /** + * Used by the realtime tasks to learn whether a segment is handed off or not. + * It returns true when the segment will never be handed off or is already handed off. Otherwise, it returns false. + */ + @GET + @Path("/{dataSourceName}/handoffComplete") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(DatasourceResourceFilter.class) + public Response isHandOffComplete( + @PathParam("dataSourceName") String dataSourceName, + @QueryParam("interval") final String interval, + @QueryParam("partitionNumber") final int partitionNumber, + @QueryParam("version") final String version + ) + { + try { + final List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSourceName); + final Interval theInterval = Intervals.of(interval); + final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber); + final DateTime now = DateTimes.nowUtc(); + // dropped means a segment will never be handed off, i.e it completed hand off + // init to true, reset to false only if this segment can be loaded by rules + boolean dropped = true; + for (Rule rule : rules) { + if (rule.appliesTo(theInterval, now)) { + if (rule instanceof LoadRule) { + dropped = false; + } + break; + } + } + if (dropped) { + return Response.ok(true).build(); + } + + TimelineLookup<String, SegmentLoadInfo> timeline = serverInventoryView.getTimeline( + new TableDataSource(dataSourceName) + ); + if (timeline == null) { + log.debug("No timeline found for datasource[%s]", dataSourceName); + return Response.ok(false).build(); + } + + Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup = timeline.lookupWithIncompletePartitions( + theInterval); + FunctionalIterable<ImmutableSegmentLoadInfo> loadInfoIterable = FunctionalIterable + .create(lookup).transformCat( + (TimelineObjectHolder<String, SegmentLoadInfo> input) -> + Iterables.transform( + input.getObject(), + (PartitionChunk<SegmentLoadInfo> chunk) -> + chunk.getObject().toImmutableSegmentLoadInfo() + ) + ); + if (isSegmentLoaded(loadInfoIterable, descriptor)) { + return Response.ok(true).build(); + } + + return Response.ok(false).build(); + } + catch (Exception e) { + log.error(e, "Error while handling hand off check request"); + return Response.serverError().entity(ImmutableMap.of("error", e.toString())).build(); + } + } + + static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor) + { + for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) { + if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) + && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber() + && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 + && Iterables.any( + segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable + )) { + return true; + } + } + return false; + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java index 1596415..228d795 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -19,23 +19,16 @@ package org.apache.druid.segment.realtime.plumber; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; -import junit.framework.Assert; -import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.joda.time.Duration; import org.joda.time.Interval; +import org.junit.Assert; import org.junit.Test; -import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; public class CoordinatorBasedSegmentHandoffNotifierTest @@ -55,27 +48,10 @@ public class CoordinatorBasedSegmentHandoffNotifierTest { Interval interval = Intervals.of("2011-04-01/2011-04-02"); SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2); - DataSegment segment = new DataSegment( - "test_ds", - interval, - "v1", - null, - null, - null, - new NumberedShardSpec(2, 3), - 0, 0 - ); CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); - EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true)) - .andReturn( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - segment, - Sets.newHashSet(createRealtimeServerMetadata("a1")) - ) - ) - ) + EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor)) + .andReturn(false) .anyTimes(); EasyMock.replay(coordinatorClient); CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( @@ -102,27 +78,11 @@ public class CoordinatorBasedSegmentHandoffNotifierTest { Interval interval = Intervals.of("2011-04-01/2011-04-02"); SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2); - DataSegment segment = new DataSegment( - "test_ds", - interval, - "v1", - null, - null, - null, - new NumberedShardSpec(2, 3), - 0, 0 - ); + final AtomicBoolean callbackCalled = new AtomicBoolean(false); CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); - EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true)) - .andReturn( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - segment, - Sets.newHashSet(createHistoricalServerMetadata("a1")) - ) - ) - ) + EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor)) + .andReturn(true) .anyTimes(); EasyMock.replay(coordinatorClient); CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( @@ -144,177 +104,4 @@ public class CoordinatorBasedSegmentHandoffNotifierTest Assert.assertTrue(callbackCalled.get()); EasyMock.verify(coordinatorClient); } - - @Test - public void testHandoffChecksForVersion() - { - Interval interval = Intervals.of( - "2011-04-01/2011-04-02" - ); - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v2", 2) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v2", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - } - - @Test - public void testHandoffChecksForAssignableServer() - { - Interval interval = Intervals.of( - "2011-04-01/2011-04-02" - ); - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 2), - Sets.newHashSet(createRealtimeServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - } - - @Test - public void testHandoffChecksForPartitionNumber() - { - Interval interval = Intervals.of( - "2011-04-01/2011-04-02" - ); - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 1) - ) - ); - - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(interval, "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(interval, "v1", 2) - ) - ); - - } - - @Test - public void testHandoffChecksForInterval() - { - - Assert.assertFalse( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1) - ) - ); - - Assert.assertTrue( - CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( - Collections.singletonList( - new ImmutableSegmentLoadInfo( - createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1), - Sets.newHashSet(createHistoricalServerMetadata("a")) - ) - ), - new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1) - ) - ); - } - - private DruidServerMetadata createRealtimeServerMetadata(String name) - { - return createServerMetadata(name, ServerType.REALTIME); - } - - private DruidServerMetadata createHistoricalServerMetadata(String name) - { - return createServerMetadata(name, ServerType.HISTORICAL); - } - - private DruidServerMetadata createServerMetadata(String name, ServerType type) - { - return new DruidServerMetadata( - name, - name, - null, - 10000, - type, - "tier", - 1 - ); - } - - private DataSegment createSegment(Interval interval, String version, int partitionNumber) - { - return new DataSegment( - "test_ds", - interval, - version, - null, - null, - null, - new NumberedShardSpec(partitionNumber, 100), - 0, 0 - ); - } } diff --git a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java index d42b678..3d2c786 100644 --- a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java @@ -21,13 +21,23 @@ package org.apache.druid.server.http; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.SegmentLoadInfo; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.rules.IntervalDropRule; +import org.apache.druid.server.coordinator.rules.IntervalLoadRule; +import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; @@ -37,6 +47,11 @@ import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Resource; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.NumberedPartitionChunk; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.timeline.partition.PartitionHolder; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; @@ -46,6 +61,7 @@ import org.junit.Test; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -156,6 +172,7 @@ public class DatasourcesResourceTest inventoryView, null, null, + null, new AuthConfig(), AuthTestUtils.TEST_AUTHORIZER_MAPPER ); @@ -240,6 +257,7 @@ public class DatasourcesResourceTest inventoryView, null, null, + null, new AuthConfig(), authMapper ); @@ -294,6 +312,7 @@ public class DatasourcesResourceTest inventoryView, null, null, + null, new AuthConfig(), AuthTestUtils.TEST_AUTHORIZER_MAPPER ); @@ -323,7 +342,7 @@ public class DatasourcesResourceTest ).atLeastOnce(); EasyMock.replay(inventoryView, server); - DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null); Response response = datasourcesResource.getTheDataSource("datasource1", "full"); ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -340,7 +359,7 @@ public class DatasourcesResourceTest ).atLeastOnce(); EasyMock.replay(inventoryView, server); - DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null); Assert.assertEquals(204, datasourcesResource.getTheDataSource("none", null).getStatus()); EasyMock.verify(inventoryView, server); } @@ -361,7 +380,7 @@ public class DatasourcesResourceTest ).atLeastOnce(); EasyMock.replay(inventoryView, server); - DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null); Response response = datasourcesResource.getTheDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity(); @@ -400,7 +419,7 @@ public class DatasourcesResourceTest ).atLeastOnce(); EasyMock.replay(inventoryView, server, server2, server3); - DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null); Response response = datasourcesResource.getTheDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity(); @@ -431,7 +450,7 @@ public class DatasourcesResourceTest List<Interval> expectedIntervals = new ArrayList<>(); expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z")); expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z")); - DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null); Response response = datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null, null); Assert.assertEquals(response.getEntity(), null); @@ -478,7 +497,7 @@ public class DatasourcesResourceTest ).atLeastOnce(); EasyMock.replay(inventoryView); - DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null); Response response = datasourcesResource.getSegmentDataSourceSpecificInterval( "invalidDataSource", "2010-01-01/P1D", @@ -548,6 +567,7 @@ public class DatasourcesResourceTest DatasourcesResource datasourcesResource = new DatasourcesResource( inventoryView, null, + null, indexingServiceClient, new AuthConfig(), null @@ -567,6 +587,7 @@ public class DatasourcesResourceTest DatasourcesResource datasourcesResource = new DatasourcesResource( inventoryView, null, + null, indexingServiceClient, new AuthConfig(), null @@ -579,4 +600,254 @@ public class DatasourcesResourceTest EasyMock.verify(indexingServiceClient, server); } + @Test + public void testIsHandOffComplete() + { + MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); + Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null); + Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z")); + DatasourcesResource datasourcesResource = new DatasourcesResource( + inventoryView, + null, + databaseRuleManager, + null, + new AuthConfig(), + null + ); + + // test dropped + EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1")) + .andReturn(ImmutableList.of(loadRule, dropRule)) + .once(); + EasyMock.replay(databaseRuleManager); + + String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z"; + Response response1 = datasourcesResource.isHandOffComplete("dataSource1", interval1, 1, "v1"); + Assert.assertTrue((boolean) response1.getEntity()); + + EasyMock.verify(databaseRuleManager); + + // test isn't dropped and no timeline found + EasyMock.reset(databaseRuleManager); + EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1")) + .andReturn(ImmutableList.of(loadRule, dropRule)) + .once(); + EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1"))) + .andReturn(null) + .once(); + EasyMock.replay(inventoryView, databaseRuleManager); + + String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z"; + Response response2 = datasourcesResource.isHandOffComplete("dataSource1", interval2, 1, "v1"); + Assert.assertFalse((boolean) response2.getEntity()); + + EasyMock.verify(inventoryView, databaseRuleManager); + + // test isn't dropped and timeline exist + String interval3 = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z"; + SegmentLoadInfo segmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval3), "v1", 1)); + segmentLoadInfo.addServer(createHistoricalServerMetadata("test")); + VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new VersionedIntervalTimeline<String, SegmentLoadInfo>( + null) + { + @Override + public List<TimelineObjectHolder<String, SegmentLoadInfo>> lookupWithIncompletePartitions(Interval interval) + { + PartitionHolder<SegmentLoadInfo> partitionHolder = new PartitionHolder<>(new NumberedPartitionChunk<>( + 1, + 1, + segmentLoadInfo + )); + List<TimelineObjectHolder<String, SegmentLoadInfo>> ret = new ArrayList<>(); + ret.add(new TimelineObjectHolder<>(Intervals.of(interval3), "v1", partitionHolder)); + return ret; + } + }; + EasyMock.reset(inventoryView, databaseRuleManager); + EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1")) + .andReturn(ImmutableList.of(loadRule, dropRule)) + .once(); + EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1"))) + .andReturn(timeline) + .once(); + EasyMock.replay(inventoryView, databaseRuleManager); + + Response response3 = datasourcesResource.isHandOffComplete("dataSource1", interval3, 1, "v1"); + Assert.assertTrue((boolean) response3.getEntity()); + + EasyMock.verify(inventoryView, databaseRuleManager); + } + + @Test + public void testSegmentLoadChecksForVersion() + { + Interval interval = Intervals.of( + "2011-04-01/2011-04-02" + ); + Assert.assertFalse( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v2", 2) + ) + ); + + Assert.assertTrue( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v2", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertTrue( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testSegmentLoadChecksForAssignableServer() + { + Interval interval = Intervals.of( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertFalse( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createRealtimeServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + } + + @Test + public void testSegmentLoadChecksForPartitionNumber() + { + Interval interval = Intervals.of( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 1) + ) + ); + + Assert.assertFalse( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testSegmentLoadChecksForInterval() + { + + Assert.assertFalse( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1) + ) + ); + + Assert.assertTrue( + DatasourcesResource.isSegmentLoaded( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1) + ) + ); + } + + private DruidServerMetadata createRealtimeServerMetadata(String name) + { + return createServerMetadata(name, ServerType.REALTIME); + } + + private DruidServerMetadata createHistoricalServerMetadata(String name) + { + return createServerMetadata(name, ServerType.HISTORICAL); + } + + private DruidServerMetadata createServerMetadata(String name, ServerType type) + { + return new DruidServerMetadata( + name, + name, + null, + 10000, + type, + "tier", + 1 + ); + } + + private DataSegment createSegment(Interval interval, String version, int partitionNumber) + { + return new DataSegment( + "test_ds", + interval, + version, + null, + null, + null, + new NumberedShardSpec(partitionNumber, 100), + 0, 0 + ); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org