This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 44b803ae8e Rename LLRealtimeSegmentDataManager to
RealtimeSegmentDataManager (#11687)
44b803ae8e is described below
commit 44b803ae8e3eee5cc191793b1ab4c35a682c6151
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Sep 26 21:08:21 2023 -0700
Rename LLRealtimeSegmentDataManager to RealtimeSegmentDataManager (#11687)
---
...anager.java => RealtimeSegmentDataManager.java} | 6 +-
.../manager/realtime/RealtimeTableDataManager.java | 8 +-
.../data/manager/realtime/SegmentCommitter.java | 2 +-
.../manager/realtime/SplitSegmentCommitter.java | 2 +-
...st.java => RealtimeSegmentDataManagerTest.java} | 170 ++++++++++-----------
.../pinot/server/api/resources/DebugResource.java | 4 +-
.../pinot/server/api/resources/TablesResource.java | 8 +-
.../FreshnessBasedConsumptionStatusChecker.java | 4 +-
.../starter/helix/HelixInstanceDataManager.java | 12 +-
.../IngestionBasedConsumptionStatusChecker.java | 8 +-
.../helix/OffsetBasedConsumptionStatusChecker.java | 4 +-
.../SegmentOnlineOfflineStateModelFactory.java | 6 +-
...FreshnessBasedConsumptionStatusCheckerTest.java | 40 ++---
.../OffsetBasedConsumptionStatusCheckerTest.java | 26 ++--
14 files changed, 150 insertions(+), 150 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
similarity index 99%
rename from
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 97c1fbfd82..e772eae7e8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -107,7 +107,7 @@ import org.slf4j.LoggerFactory;
/**
* Segment data manager for low level consumer realtime segments, which
manages consumption and segment completion.
*/
-public class LLRealtimeSegmentDataManager extends SegmentDataManager {
+public class RealtimeSegmentDataManager extends SegmentDataManager {
@VisibleForTesting
public enum State {
@@ -1314,7 +1314,7 @@ public class LLRealtimeSegmentDataManager extends
SegmentDataManager {
// Assume that this is called only on OFFLINE to CONSUMING transition.
// If the transition is OFFLINE to ONLINE, the caller should have downloaded
the segment and we don't reach here.
- public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata,
TableConfig tableConfig,
+ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata,
TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, Semaphore
partitionGroupConsumerSemaphore,
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
@@ -1357,7 +1357,7 @@ public class LLRealtimeSegmentDataManager extends
SegmentDataManager {
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
_clientId = _tableNameWithType + "-" + streamTopic + "-" +
_partitionGroupId;
- _segmentLogger =
LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" +
_segmentNameStr);
+ _segmentLogger =
LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" +
_segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
if (_indexLoadingConfig.isRealtimeOffHeapAllocation() &&
!_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
_memoryManager =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index e4b34d244d..88ca87ad62 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -436,12 +436,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
- LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
- new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this,
_indexDir.getAbsolutePath(),
+ RealtimeSegmentDataManager realtimeSegmentDataManager =
+ new RealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this,
_indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore,
_serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager, _isTableReadyToConsumeData);
- llRealtimeSegmentDataManager.startConsumption();
- segmentDataManager = llRealtimeSegmentDataManager;
+ realtimeSegmentDataManager.startConsumption();
+ segmentDataManager = realtimeSegmentDataManager;
_logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
registerSegment(segmentName, segmentDataManager);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
index 922c54bd55..b9c1270e39 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitter.java
@@ -30,5 +30,5 @@ public interface SegmentCommitter {
* @param segmentBuildDescriptor object that describes segment to be
committed
* @return
*/
- SegmentCompletionProtocol.Response
commit(LLRealtimeSegmentDataManager.SegmentBuildDescriptor
segmentBuildDescriptor);
+ SegmentCompletionProtocol.Response
commit(RealtimeSegmentDataManager.SegmentBuildDescriptor
segmentBuildDescriptor);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 7f3d3ea1fb..1e4ebfe1f8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -63,7 +63,7 @@ public class SplitSegmentCommitter implements
SegmentCommitter {
@Override
public SegmentCompletionProtocol.Response commit(
- LLRealtimeSegmentDataManager.SegmentBuildDescriptor
segmentBuildDescriptor) {
+ RealtimeSegmentDataManager.SegmentBuildDescriptor
segmentBuildDescriptor) {
File segmentTarFile = segmentBuildDescriptor.getSegmentTarFile();
SegmentCompletionProtocol.Response segmentCommitStartResponse =
_protocolHandler.segmentCommitStart(_params);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
similarity index 87%
rename from
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index b36059def7..1574892f42 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -77,8 +77,8 @@ import static org.mockito.Mockito.when;
// TODO Re-write this test using the stream abstraction
-public class LLRealtimeSegmentDataManagerTest {
- private static final String SEGMENT_DIR = "/tmp/" +
LLRealtimeSegmentDataManagerTest.class.getSimpleName();
+public class RealtimeSegmentDataManagerTest {
+ private static final String SEGMENT_DIR = "/tmp/" +
RealtimeSegmentDataManagerTest.class.getSimpleName();
private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@@ -120,12 +120,12 @@ public class LLRealtimeSegmentDataManagerTest {
return segmentZKMetadata;
}
- private FakeLLRealtimeSegmentDataManager createFakeSegmentManager()
+ private FakeRealtimeSegmentDataManager createFakeSegmentManager()
throws Exception {
return createFakeSegmentManager(false, new TimeSupplier(), null, null,
null);
}
- private FakeLLRealtimeSegmentDataManager createFakeSegmentManager(boolean
noUpsert, TimeSupplier timeSupplier,
+ private FakeRealtimeSegmentDataManager createFakeSegmentManager(boolean
noUpsert, TimeSupplier timeSupplier,
@Nullable String maxRows, @Nullable String maxDuration, @Nullable
TableConfig tableConfig)
throws Exception {
SegmentZKMetadata segmentZKMetadata = createZkMetadata();
@@ -148,7 +148,7 @@ public class LLRealtimeSegmentDataManagerTest {
_partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new
Semaphore(1));
Schema schema = Fixtures.createSchema();
ServerMetrics serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- return new FakeLLRealtimeSegmentDataManager(segmentZKMetadata,
tableConfig, tableDataManager, SEGMENT_DIR, schema,
+ return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
tableDataManager, SEGMENT_DIR, schema,
llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics,
timeSupplier);
}
@@ -169,8 +169,8 @@ public class LLRealtimeSegmentDataManagerTest {
@Test
public void testHolding()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
// We should consume initially...
segmentDataManager._consumeOffsets.add(endOffset);
@@ -191,7 +191,7 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildSegmentCalled);
Assert.assertFalse(segmentDataManager._commitSegmentCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.HOLDING);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.HOLDING);
segmentDataManager.destroy();
}
@@ -199,8 +199,8 @@ public class LLRealtimeSegmentDataManagerTest {
@Test
public void testCommitAfterHold()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
// We should consume initially...
segmentDataManager._consumeOffsets.add(endOffset);
@@ -223,15 +223,15 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- LLRealtimeSegmentDataManager.State.COMMITTED);
+ RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@Test
public void testSegmentBuildException()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
// We should consume initially...
segmentDataManager._consumeOffsets.add(endOffset);
@@ -243,7 +243,7 @@ public class LLRealtimeSegmentDataManagerTest {
consumer.run();
Assert.assertTrue(segmentDataManager._buildSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.ERROR);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.ERROR);
segmentDataManager.destroy();
}
@@ -251,8 +251,8 @@ public class LLRealtimeSegmentDataManagerTest {
@Test
public void testCommitAfterCatchup()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset firstOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
final LongMsgOffset catchupOffset = new
LongMsgOffset(firstOffset.getOffset() + 10);
// We should consume initially...
@@ -287,7 +287,7 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- LLRealtimeSegmentDataManager.State.COMMITTED);
+ RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@@ -297,9 +297,9 @@ public class LLRealtimeSegmentDataManagerTest {
tableConfig.getIndexingConfig().getStreamConfigs()
.put(StreamConfigProperties.constructStreamProperty(
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
"fakeStream"), "2d");
- FakeLLRealtimeSegmentDataManager segmentDataManager =
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(false, new TimeSupplier(), null, null,
tableConfig);
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset firstOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
final LongMsgOffset catchupOffset = new
LongMsgOffset(firstOffset.getOffset() + 10);
// We should consume initially...
@@ -334,7 +334,7 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- LLRealtimeSegmentDataManager.State.COMMITTED);
+ RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@@ -344,9 +344,9 @@ public class LLRealtimeSegmentDataManagerTest {
tableConfig.getIndexingConfig().getStreamConfigs()
.put(StreamConfigProperties.constructStreamProperty(
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
"fakeStream"), Instant.now().toString());
- FakeLLRealtimeSegmentDataManager segmentDataManager =
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(false, new TimeSupplier(), null, null,
tableConfig);
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset firstOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
final LongMsgOffset catchupOffset = new
LongMsgOffset(firstOffset.getOffset() + 10);
// We should consume initially...
@@ -381,15 +381,15 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- LLRealtimeSegmentDataManager.State.COMMITTED);
+ RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@Test
public void testDiscarded()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
segmentDataManager._consumeOffsets.add(endOffset);
final SegmentCompletionProtocol.Response discardResponse = new
SegmentCompletionProtocol.Response(
@@ -406,15 +406,15 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._commitSegmentCalled);
Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- LLRealtimeSegmentDataManager.State.DISCARDED);
+ RealtimeSegmentDataManager.State.DISCARDED);
segmentDataManager.destroy();
}
@Test
public void testRetained()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
segmentDataManager._consumeOffsets.add(endOffset);
SegmentCompletionProtocol.Response.Params params = new
SegmentCompletionProtocol.Response.Params();
@@ -431,15 +431,15 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._commitSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.RETAINED);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.RETAINED);
segmentDataManager.destroy();
}
@Test
public void testNotLeader()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
// We should consume initially...
segmentDataManager._consumeOffsets.add(endOffset);
@@ -459,15 +459,15 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildSegmentCalled);
Assert.assertFalse(segmentDataManager._commitSegmentCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
LLRealtimeSegmentDataManager.State.HOLDING);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.HOLDING);
segmentDataManager.destroy();
}
@Test
public void testConsumingException()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
segmentDataManager._throwExceptionFromConsume = true;
segmentDataManager._postConsumeStoppedCalled = false;
@@ -489,9 +489,9 @@ public class LLRealtimeSegmentDataManagerTest {
metadata.setEndOffset(finalOffset.toString());
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.COMMITTED);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -499,9 +499,9 @@ public class LLRealtimeSegmentDataManagerTest {
}
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.RETAINED);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.RETAINED);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -509,9 +509,9 @@ public class LLRealtimeSegmentDataManagerTest {
}
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.DISCARDED);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.DISCARDED);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -519,9 +519,9 @@ public class LLRealtimeSegmentDataManagerTest {
}
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.ERROR);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.ERROR);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
@@ -530,9 +530,9 @@ public class LLRealtimeSegmentDataManagerTest {
// If holding, but we have overshot the expected final offset, the
download and replace
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.HOLDING);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.HOLDING);
segmentDataManager.setCurrentOffset(finalOffsetValue + 1);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
@@ -542,9 +542,9 @@ public class LLRealtimeSegmentDataManagerTest {
// If catching up, but we have overshot the expected final offset, the
download and replace
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CATCHING_UP);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CATCHING_UP);
segmentDataManager.setCurrentOffset(finalOffsetValue + 1);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
@@ -554,9 +554,9 @@ public class LLRealtimeSegmentDataManagerTest {
// If catching up, but we did not get to the final offset, then download
and replace
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CATCHING_UP);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CATCHING_UP);
segmentDataManager._consumeOffsets.add(new
LongMsgOffset(finalOffsetValue - 1));
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
@@ -566,9 +566,9 @@ public class LLRealtimeSegmentDataManagerTest {
// But then if we get to the exact offset, we get to build and replace,
not download
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CATCHING_UP);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CATCHING_UP);
segmentDataManager._consumeOffsets.add(finalOffset);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
@@ -582,8 +582,8 @@ public class LLRealtimeSegmentDataManagerTest {
throws Exception {
// test reaching max row limit
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
segmentDataManager.setNumRowsIndexed(Fixtures.MAX_ROWS_IN_SEGMENT - 1);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
@@ -594,8 +594,8 @@ public class LLRealtimeSegmentDataManagerTest {
}
// test reaching max time limit
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
// We should still get false because there is no messages fetched
segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS +
1);
@@ -609,8 +609,8 @@ public class LLRealtimeSegmentDataManagerTest {
}
// In catching up state, test reaching final offset
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CATCHING_UP);
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CATCHING_UP);
final long finalOffset = START_OFFSET_VALUE + 100;
segmentDataManager.setFinalOffset(finalOffset);
segmentDataManager.setCurrentOffset(finalOffset - 1);
@@ -621,9 +621,9 @@ public class LLRealtimeSegmentDataManagerTest {
}
// In catching up state, test reaching final offset ignoring time
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS);
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CATCHING_UP);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CATCHING_UP);
final long finalOffset = START_OFFSET_VALUE + 100;
segmentDataManager.setFinalOffset(finalOffset);
segmentDataManager.setCurrentOffset(finalOffset - 1);
@@ -635,9 +635,9 @@ public class LLRealtimeSegmentDataManagerTest {
// When we go from consuming to online state, time and final offset matter.
// Case 1. We have reached final offset.
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
segmentDataManager._timeSupplier.add(1);
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
segmentDataManager.setConsumeEndTime(segmentDataManager._timeSupplier.get() +
10);
final long finalOffset = START_OFFSET_VALUE + 100;
segmentDataManager.setFinalOffset(finalOffset);
@@ -649,8 +649,8 @@ public class LLRealtimeSegmentDataManagerTest {
}
// Case 2. We have reached time limit.
{
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
final long endTime = segmentDataManager._timeSupplier.get() + 10;
segmentDataManager.setConsumeEndTime(endTime);
final long finalOffset = START_OFFSET_VALUE + 100;
@@ -664,9 +664,9 @@ public class LLRealtimeSegmentDataManagerTest {
}
}
- private void setHasMessagesFetched(FakeLLRealtimeSegmentDataManager
segmentDataManager, boolean hasMessagesFetched)
+ private void setHasMessagesFetched(FakeRealtimeSegmentDataManager
segmentDataManager, boolean hasMessagesFetched)
throws Exception {
- Field field =
LLRealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched");
+ Field field =
RealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched");
field.setAccessible(true);
field.set(segmentDataManager, hasMessagesFetched);
}
@@ -675,7 +675,7 @@ public class LLRealtimeSegmentDataManagerTest {
@Test
public void testReuseOfBuiltSegment()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
SegmentCompletionProtocol.Response.Params params = new
SegmentCompletionProtocol.Response.Params();
params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
@@ -712,7 +712,7 @@ public class LLRealtimeSegmentDataManagerTest {
@Test
public void testFileRemovedDuringOnlineTransition()
throws Exception {
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
SegmentCompletionProtocol.Response.Params params = new
SegmentCompletionProtocol.Response.Params();
params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
@@ -735,7 +735,7 @@ public class LLRealtimeSegmentDataManagerTest {
SegmentZKMetadata metadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
metadata.setEndOffset(new LongMsgOffset(finalOffset).toString());
segmentDataManager._stopWaitTimeMs = 0;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.HOLDING);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.HOLDING);
segmentDataManager.goOnlineFromConsuming(metadata);
Assert.assertFalse(segmentTarFile.exists());
segmentDataManager.destroy();
@@ -745,13 +745,13 @@ public class LLRealtimeSegmentDataManagerTest {
public void testOnlyOneSegmentHoldingTheSemaphoreForParticularPartition()
throws Exception {
long timeout = 10_000L;
- FakeLLRealtimeSegmentDataManager firstSegmentDataManager =
createFakeSegmentManager();
+ FakeRealtimeSegmentDataManager firstSegmentDataManager =
createFakeSegmentManager();
Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get());
Semaphore firstSemaphore =
firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
Assert.assertEquals(firstSemaphore.availablePermits(), 0);
Assert.assertFalse(firstSemaphore.hasQueuedThreads());
- AtomicReference<FakeLLRealtimeSegmentDataManager> secondSegmentDataManager
= new AtomicReference<>(null);
+ AtomicReference<FakeRealtimeSegmentDataManager> secondSegmentDataManager =
new AtomicReference<>(null);
// Construct the second segment manager, which will be blocked on the
semaphore.
Thread constructSecondSegmentManager = new Thread(() -> {
@@ -837,13 +837,13 @@ public class LLRealtimeSegmentDataManagerTest {
return now + TimeUnit.MINUTES.toMillis(segmentTimeThresholdMins + 1);
}
};
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 2),
segmentTimeThresholdMins + "m", null);
segmentDataManager._stubConsumeLoop = false;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset =
new LongMsgOffset(START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
segmentDataManager._consumeOffsets.add(endOffset);
@@ -876,13 +876,13 @@ public class LLRealtimeSegmentDataManagerTest {
throws Exception {
final int segmentTimeThresholdMins = 10;
TimeSupplier timeSupplier = new TimeSupplier();
- FakeLLRealtimeSegmentDataManager segmentDataManager =
+ FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS),
segmentTimeThresholdMins + "m", null);
segmentDataManager._stubConsumeLoop = false;
- segmentDataManager._state.set(segmentDataManager,
LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
- LLRealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
final LongMsgOffset endOffset =
new LongMsgOffset(START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
segmentDataManager._consumeOffsets.add(endOffset);
@@ -928,7 +928,7 @@ public class LLRealtimeSegmentDataManagerTest {
}
}
- public static class FakeLLRealtimeSegmentDataManager extends
LLRealtimeSegmentDataManager {
+ public static class FakeRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
public Field _state;
public Field _shouldStop;
@@ -958,7 +958,7 @@ public class LLRealtimeSegmentDataManagerTest {
return dataManagerConfig;
}
- public FakeLLRealtimeSegmentDataManager(SegmentZKMetadata
segmentZKMetadata, TableConfig tableConfig,
+ public FakeRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata,
TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, Schema schema,
LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap,
ServerMetrics serverMetrics,
TimeSupplier timeSupplier)
@@ -966,14 +966,14 @@ public class LLRealtimeSegmentDataManagerTest {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(),
tableConfig), schema, llcSegmentName,
semaphoreMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics, null, null, () -> true);
- _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
+ _state = RealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
- _shouldStop =
LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
+ _shouldStop =
RealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
_shouldStop.setAccessible(true);
- _stopReason =
LLRealtimeSegmentDataManager.class.getDeclaredField("_stopReason");
+ _stopReason =
RealtimeSegmentDataManager.class.getDeclaredField("_stopReason");
_stopReason.setAccessible(true);
_semaphoreMap = semaphoreMap;
- _streamMsgOffsetFactory =
LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
+ _streamMsgOffsetFactory =
RealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
_streamMsgOffsetFactory.setAccessible(true);
_streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
_timeSupplier = timeSupplier;
@@ -1129,7 +1129,7 @@ public class LLRealtimeSegmentDataManagerTest {
public boolean invokeEndCriteriaReached() {
Method endCriteriaReached = null;
try {
- endCriteriaReached =
LLRealtimeSegmentDataManager.class.getDeclaredMethod("endCriteriaReached");
+ endCriteriaReached =
RealtimeSegmentDataManager.class.getDeclaredMethod("endCriteriaReached");
endCriteriaReached.setAccessible(true);
Boolean result = (Boolean) endCriteriaReached.invoke(this);
return result;
@@ -1149,7 +1149,7 @@ public class LLRealtimeSegmentDataManagerTest {
private void setLong(long value, String fieldName) {
try {
- Field field =
LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName);
+ Field field =
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.setLong(this, value);
} catch (NoSuchFieldException e) {
@@ -1161,7 +1161,7 @@ public class LLRealtimeSegmentDataManagerTest {
private void setOffset(long value, String fieldName) {
try {
- Field field =
LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName);
+ Field field =
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
field.setAccessible(true);
StreamPartitionMsgOffset offset = (StreamPartitionMsgOffset)
field.get(this);
// if (offset == null) {
@@ -1178,7 +1178,7 @@ public class LLRealtimeSegmentDataManagerTest {
private void setInt(int value, String fieldName) {
try {
- Field field =
LLRealtimeSegmentDataManager.class.getDeclaredField(fieldName);
+ Field field =
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.setInt(this, value);
} catch (NoSuchFieldException e) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index e7b79c5156..5c666a28bd 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -45,7 +45,7 @@ import
org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -166,7 +166,7 @@ public class DebugResource {
private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager
segmentDataManager, TableType tableType) {
SegmentConsumerInfo segmentConsumerInfo = null;
if (tableType == TableType.REALTIME) {
- LLRealtimeSegmentDataManager realtimeSegmentDataManager =
(LLRealtimeSegmentDataManager) segmentDataManager;
+ RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
Map<String, String> currentOffsets =
realtimeSegmentDataManager.getPartitionToCurrentOffset();
Map<String, String> upstreamLatest =
partitionStateMap.entrySet().stream().collect(
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 7999360044..b08833b966 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -76,7 +76,7 @@ import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -309,7 +309,7 @@ public class TablesResource {
int totalSegmentCount = 0;
Map<String, Map<String, Integer>> columnToIndexesCount = new HashMap<>();
for (SegmentDataManager segmentDataManager : allSegments) {
- if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
// REALTIME segments may not have indexes since not all indexes have
mutable implementations
continue;
}
@@ -685,8 +685,8 @@ public class TablesResource {
List<SegmentDataManager> segmentDataManagers =
tableDataManager.acquireAllSegments();
try {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
- LLRealtimeSegmentDataManager realtimeSegmentDataManager =
(LLRealtimeSegmentDataManager) segmentDataManager;
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
Map<String, String> recordsLagMap = new HashMap<>();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index abce5d5aaa..6f3610e596 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -21,7 +21,7 @@ package org.apache.pinot.server.starter.helix;
import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -63,7 +63,7 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
}
@Override
- protected boolean isSegmentCaughtUp(String segmentName,
LLRealtimeSegmentDataManager rtSegmentDataManager) {
+ protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager) {
long now = now();
long latestIngestionTimestamp =
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index ba66798763..480c392db8 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -53,8 +53,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
@@ -171,7 +171,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
File[] tableDataDirs = instanceDataDir.listFiles((dir, name) ->
TableNameBuilder.isTableResource(name));
if (tableDataDirs != null) {
for (File tableDataDir : tableDataDirs) {
- File resourceTempDir = new File(tableDataDir,
LLRealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME);
+ File resourceTempDir = new File(tableDataDir,
RealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME);
try {
FileUtils.deleteDirectory(resourceTempDir);
} catch (IOException e) {
@@ -447,10 +447,10 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
tableNameWithType);
return;
}
- if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
LOGGER.info("Reloading (force committing) consuming segment: {} in
table: {}", segmentName,
tableNameWithType);
- ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit();
+ ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
}
return;
} finally {
@@ -600,8 +600,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segName);
if (segmentDataManager != null) {
try {
- if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
- ((LLRealtimeSegmentDataManager)
segmentDataManager).forceCommit();
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
}
} finally {
tableDataManager.releaseSegment(segmentDataManager);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index 54667a43ac..83de35a63c 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableType;
@@ -66,7 +66,7 @@ public abstract class IngestionBasedConsumptionStatusChecker {
segName);
continue;
}
- if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) {
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
// There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
// segment data manager will not be of type
RealtimeSegmentDataManager.
_logger.info("Segment {} is already committed and is considered
caught up.", segName);
@@ -74,7 +74,7 @@ public abstract class IngestionBasedConsumptionStatusChecker {
continue;
}
- LLRealtimeSegmentDataManager rtSegmentDataManager =
(LLRealtimeSegmentDataManager) segmentDataManager;
+ RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
_caughtUpSegments.add(segName);
}
@@ -87,7 +87,7 @@ public abstract class IngestionBasedConsumptionStatusChecker {
return _consumingSegments.size() - _caughtUpSegments.size();
}
- protected abstract boolean isSegmentCaughtUp(String segmentName,
LLRealtimeSegmentDataManager rtSegmentDataManager);
+ protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager);
private TableDataManager getTableDataManager(String segmentName) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index 5deafb2daf..6b597e3fa2 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -21,7 +21,7 @@ package org.apache.pinot.server.starter.helix;
import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -39,7 +39,7 @@ public class OffsetBasedConsumptionStatusChecker extends
IngestionBasedConsumpti
}
@Override
- protected boolean isSegmentCaughtUp(String segmentName,
LLRealtimeSegmentDataManager rtSegmentDataManager) {
+ protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager) {
StreamPartitionMsgOffset latestIngestedOffset =
rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
if (latestStreamOffset == null || latestIngestedOffset == null) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 1ad92ee96d..42d1642c7b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableType;
@@ -107,14 +107,14 @@ public class SegmentOnlineOfflineStateModelFactory
extends StateModelFactory<Sta
// TODO: https://github.com/apache/pinot/issues/10049
try {
- if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) {
+ if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) {
// We found an LLC segment that is not consuming right now, must be
that we already swapped it with a
// segment that has been built. Nothing to do for this state
transition.
_logger.info("Segment {} not an instance of
RealtimeSegmentDataManager. Reporting success for the transition",
acquiredSegment.getSegmentName());
return;
}
- LLRealtimeSegmentDataManager segmentDataManager =
(LLRealtimeSegmentDataManager) acquiredSegment;
+ RealtimeSegmentDataManager segmentDataManager =
(RealtimeSegmentDataManager) acquiredSegment;
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(),
realtimeTableName,
segmentName);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index cab178bc0d..6301b54d04 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -73,9 +73,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -119,7 +119,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
- private void setupLatestIngestionTimestamp(LLRealtimeSegmentDataManager
segmentDataManager,
+ private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager
segmentDataManager,
long latestIngestionTimestamp) {
MutableSegment mockSegment = mock(MutableSegment.class);
SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
@@ -148,9 +148,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -212,9 +212,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -285,9 +285,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -334,9 +334,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -392,9 +392,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index 20ad08a11d..88b05b8ff0 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
-import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.testng.annotations.Test;
@@ -53,9 +53,9 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -104,8 +104,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup some SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
@@ -120,7 +120,7 @@ public class OffsetBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// setup the remaining SegmentDataManager
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
// latest ingested offset latest stream offset
@@ -161,9 +161,9 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
@@ -211,9 +211,9 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
// setup SegmentDataManagers
- LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
- LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]