This is an automated email from the ASF dual-hosted git repository.
davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f0ecdfe Fix `is_realtime` column behavior in sys.segments table
(#8154)
f0ecdfe is described below
commit f0ecdfee30853535e4bfbbfc4f769ccd12d0694a
Author: Surekha <[email protected]>
AuthorDate: Wed Jul 31 21:26:49 2019 -0700
Fix `is_realtime` column behavior in sys.segments table (#8154)
* Fix is_realtime flag
* make variable final
* minor changes
* Modify is_realtime behavior based on review comment
* Fix UT
---
docs/content/querying/sql.md | 2 +-
.../calcite/schema/AvailableSegmentMetadata.java | 21 ++++--
.../druid/sql/calcite/schema/DruidSchema.java | 43 +++++++----
.../druid/sql/calcite/schema/DruidSchemaTest.java | 84 +++++++++++++++++++---
.../sql/calcite/util/TestServerInventoryView.java | 13 +++-
5 files changed, 133 insertions(+), 30 deletions(-)
diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 8dfb0e8..3678024 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -655,7 +655,7 @@ Note that a segment can be served by more than one stream
ingestion tasks or His
|num_rows|LONG|Number of rows in current segment, this value could be null if
unkown to Broker at query time|
|is_published|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 represents this segment has been published to the metadata store with
`used=1`|
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is currently being served by any process(Historical or
realtime)|
-|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is being served on any type of realtime tasks|
+|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is _only_ served by realtime tasks, and 0 if any
historical process is serving this segment|
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is published and is _fully_ overshadowed by some other
published segments. Currently, is_overshadowed is always false for unpublished
segments, although this may change in the future. You can filter for segments
that "should be published" by filtering for `is_published = 1 AND
is_overshadowed = 0`. Segments can briefly be both published and overshadowed
if they were recently replaced, b [...]
|payload|STRING|JSON-serialized data segment payload|
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
index 1dc8e1e..4efff11 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
@@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.DataSegment;
@@ -34,7 +35,7 @@ public class AvailableSegmentMetadata
public static Builder builder(
DataSegment segment,
long isRealtime,
- Set<String> segmentServers,
+ Set<DruidServerMetadata> segmentServers,
RowSignature rowSignature,
long numRows
)
@@ -58,7 +59,7 @@ public class AvailableSegmentMetadata
// to make it easy to count number of segments which are realtime
private final long isRealtime;
// set of servers that contain the segment
- private final Set<String> segmentServers;
+ private final Set<DruidServerMetadata> segmentServers;
private final long numRows;
@Nullable
private final RowSignature rowSignature;
@@ -82,7 +83,7 @@ public class AvailableSegmentMetadata
return segment;
}
- public Set<String> getReplicas()
+ public Set<DruidServerMetadata> getReplicas()
{
return segmentServers;
}
@@ -106,9 +107,9 @@ public class AvailableSegmentMetadata
public static class Builder
{
private final DataSegment segment;
- private final long isRealtime;
- private Set<String> segmentServers;
+ private long isRealtime;
+ private Set<DruidServerMetadata> segmentServers;
@Nullable
private RowSignature rowSignature;
private long numRows;
@@ -116,7 +117,7 @@ public class AvailableSegmentMetadata
private Builder(
DataSegment segment,
long isRealtime,
- Set<String> servers,
+ Set<DruidServerMetadata> servers,
@Nullable RowSignature rowSignature,
long numRows
)
@@ -140,12 +141,18 @@ public class AvailableSegmentMetadata
return this;
}
- public Builder withReplicas(Set<String> servers)
+ public Builder withReplicas(Set<DruidServerMetadata> servers)
{
this.segmentServers = servers;
return this;
}
+ public Builder withRealtime(long isRealtime)
+ {
+ this.isRealtime = isRealtime;
+ return this;
+ }
+
public AvailableSegmentMetadata build()
{
return new AvailableSegmentMetadata(this);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 2116fbb..55a9a79 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -57,6 +57,7 @@ import
org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -73,6 +74,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -359,14 +361,12 @@ public class DruidSchema extends AbstractSchema
final Map<SegmentId, AvailableSegmentMetadata> knownSegments =
segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ?
knownSegments.get(segment.getId()) : null;
if (segmentMetadata == null) {
- // segmentReplicatable is used to determine if segments are served by
realtime servers or not
- final long isRealtime = server.segmentReplicatable() ? 0 : 1;
-
- final Set<String> servers = ImmutableSet.of(server.getName());
+ // segmentReplicatable is used to determine if segments are served by
historical or realtime servers
+ long isRealtime = server.segmentReplicatable() ? 0 : 1;
segmentMetadata = AvailableSegmentMetadata.builder(
segment,
isRealtime,
- servers,
+ ImmutableSet.of(server),
null,
DEFAULT_NUM_ROWS
).build();
@@ -380,14 +380,15 @@ public class DruidSchema extends AbstractSchema
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
- final Set<String> segmentServers = segmentMetadata.getReplicas();
- final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
+ final Set<DruidServerMetadata> segmentServers =
segmentMetadata.getReplicas();
+ final ImmutableSet<DruidServerMetadata> servers = new
ImmutableSet.Builder<DruidServerMetadata>()
.addAll(segmentServers)
- .add(server.getName())
+ .add(server)
.build();
final AvailableSegmentMetadata metadataWithNumReplicas =
AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
+ .withRealtime(recomputeIsRealtime(servers))
.build();
knownSegments.put(segment.getId(), metadataWithNumReplicas);
if (server.segmentReplicatable()) {
@@ -431,19 +432,23 @@ public class DruidSchema extends AbstractSchema
}
}
- private void removeServerSegment(final DruidServerMetadata server, final
DataSegment segment)
+ @VisibleForTesting
+ void removeServerSegment(final DruidServerMetadata server, final DataSegment
segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(),
server.getName());
final Map<SegmentId, AvailableSegmentMetadata> knownSegments =
segmentMetadataInfo.get(segment.getDataSource());
final AvailableSegmentMetadata segmentMetadata =
knownSegments.get(segment.getId());
- final Set<String> segmentServers = segmentMetadata.getReplicas();
- final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
-
.filter(Predicates.not(Predicates.equalTo(server.getName())))
- .toSet();
+ final Set<DruidServerMetadata> segmentServers =
segmentMetadata.getReplicas();
+ final ImmutableSet<DruidServerMetadata> servers = FluentIterable
+ .from(segmentServers)
+ .filter(Predicates.not(Predicates.equalTo(server)))
+ .toSet();
+
final AvailableSegmentMetadata metadataWithNumReplicas =
AvailableSegmentMetadata
.from(segmentMetadata)
.withReplicas(servers)
+ .withRealtime(recomputeIsRealtime(servers))
.build();
knownSegments.put(segment.getId(), metadataWithNumReplicas);
lock.notifyAll();
@@ -475,6 +480,18 @@ public class DruidSchema extends AbstractSchema
return retVal;
}
+ private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
+ {
+ final Optional<DruidServerMetadata> historicalServer = servers
+ .stream()
+ .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL))
+ .findAny();
+
+ // if there is any historical server in the replicas, isRealtime flag
should be unset
+ final long isRealtime = historicalServer.isPresent() ? 0 : 1;
+ return isRealtime;
+ }
+
/**
* Attempt to refresh "segmentSignatures" for a set of segments for a
particular dataSource. Returns the set of
* segments actually refreshed, which may be a subset of the asked-for set.
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 08fcb7e..1b2573e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable;
@@ -54,6 +55,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -168,8 +170,20 @@ public class DruidSchemaTest extends CalciteTestBase
.build(),
index2
);
-
- final TimelineServerView serverView = new
TestServerInventoryView(walker.getSegments());
+ final DataSegment segment1 = new DataSegment(
+ "foo3",
+ Intervals.of("2012/2013"),
+ "version3",
+ null,
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("met1", "met2"),
+ new NumberedShardSpec(2, 3),
+ 1,
+ 100L,
+ DataSegment.PruneLoadSpecHolder.DEFAULT
+ );
+ final List<DataSegment> realtimeSegments = ImmutableList.of(segment1);
+ final TimelineServerView serverView = new
TestServerInventoryView(walker.getSegments(), realtimeSegments);
druidServers = serverView.getDruidServers();
schema = new DruidSchema(
@@ -253,14 +267,14 @@ public class DruidSchemaTest extends CalciteTestBase
* is called more than once for same segment
*/
@Test
- public void testSegmentMetadataHolderNumRows()
+ public void testAvailableSegmentMetadataNumRows()
{
Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata =
schema.getSegmentMetadataSnapshot();
final List<DataSegment> segments = segmentsMetadata.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
- Assert.assertEquals(3, segments.size());
+ Assert.assertEquals(4, segments.size());
// find the only segment with datasource "foo2"
final DataSegment existingSegment = segments.stream()
.filter(segment ->
segment.getDataSource().equals("foo2"))
@@ -309,7 +323,7 @@ public class DruidSchemaTest extends CalciteTestBase
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
- Assert.assertEquals(segments.size(), 3);
+ Assert.assertEquals(4, segments.size());
// segments contains two segments with datasource "foo" and one with
datasource "foo2"
// let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream()
@@ -321,7 +335,7 @@ public class DruidSchemaTest extends CalciteTestBase
// The following line can cause NPE without segmentMetadata null check in
DruidSchema#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
- Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2);
+ Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
}
@Test
@@ -332,7 +346,7 @@ public class DruidSchemaTest extends CalciteTestBase
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
- Assert.assertEquals(segments.size(), 3);
+ Assert.assertEquals(4, segments.size());
// remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream()
.filter(segment ->
segment.getDataSource().equals("foo"))
@@ -343,7 +357,61 @@ public class DruidSchemaTest extends CalciteTestBase
// The following line can cause NPE without segmentMetadata null check in
DruidSchema#refreshSegmentsForDataSource
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
- Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2);
+ Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
+ }
+
+ @Test
+ public void testAvailableSegmentMetadataIsRealtime()
+ {
+ Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata =
schema.getSegmentMetadataSnapshot();
+ final List<DataSegment> segments = segmentsMetadata.values()
+ .stream()
+
.map(AvailableSegmentMetadata::getSegment)
+
.collect(Collectors.toList());
+ // find the only realtime segment with datasource "foo3"
+ final DataSegment existingSegment = segments.stream()
+ .filter(segment ->
segment.getDataSource().equals("foo3"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(existingSegment);
+ final AvailableSegmentMetadata metadata =
segmentsMetadata.get(existingSegment.getId());
+ Assert.assertEquals(1L, metadata.isRealtime());
+ // get the historical server
+ final ImmutableDruidServer historicalServer = druidServers.stream()
+ .filter(s ->
s.getType().equals(ServerType.HISTORICAL))
+ .findAny()
+ .orElse(null);
+
+ Assert.assertNotNull(historicalServer);
+ final DruidServerMetadata historicalServerMetadata =
historicalServer.getMetadata();
+
+ // add existingSegment to historical
+ schema.addSegment(historicalServerMetadata, existingSegment);
+ segmentsMetadata = schema.getSegmentMetadataSnapshot();
+ // get the segment with datasource "foo3"
+ DataSegment currentSegment = segments.stream()
+ .filter(segment ->
segment.getDataSource().equals("foo3"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(currentSegment);
+ AvailableSegmentMetadata currentMetadata =
segmentsMetadata.get(currentSegment.getId());
+ Assert.assertEquals(0L, currentMetadata.isRealtime());
+
+ ImmutableDruidServer realtimeServer = druidServers.stream()
+ .filter(s ->
s.getType().equals(ServerType.REALTIME))
+ .findAny()
+ .orElse(null);
+ Assert.assertNotNull(realtimeServer);
+ // drop existingSegment from realtime task
+ schema.removeServerSegment(realtimeServer.getMetadata(), existingSegment);
+ segmentsMetadata = schema.getSegmentMetadataSnapshot();
+ currentSegment = segments.stream()
+ .filter(segment ->
segment.getDataSource().equals("foo3"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(currentSegment);
+ currentMetadata = segmentsMetadata.get(currentSegment.getId());
+ Assert.assertEquals(0L, currentMetadata.isRealtime());
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 2dcc569..0f36273 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -93,7 +93,18 @@ public class TestServerInventoryView implements
TimelineServerView
ImmutableMap.of("src", dataSource),
1
);
- return ImmutableList.of(server);
+ final ImmutableDruidDataSource dataSource2 = new ImmutableDruidDataSource(
+ "DUMMY2",
+ Collections.emptyMap(),
+ realtimeSegments
+ );
+ final ImmutableDruidServer realtimeServer = new ImmutableDruidServer(
+ DUMMY_SERVER_REALTIME,
+ 0L,
+ ImmutableMap.of("src", dataSource2),
+ 1
+ );
+ return ImmutableList.of(server, realtimeServer);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]