This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 31c55af [Part 3.1] Deepstore by-pass: Add a new best effort segment
uploader with bounded upload time and d… (#5314)
31c55af is described below
commit 31c55afdb6a40f98189308ce6292587ead9d0dec
Author: Ting Chen <[email protected]>
AuthorDate: Tue May 5 14:20:20 2020 -0700
[Part 3.1] Deepstore by-pass: Add a new best effort segment uploader with
bounded upload time and d… (#5314)
* Add a new best effort segment uploader with bounded upload time and
default segment location when upload fails.
* Update
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/BestEffortSegmentUploader.java
Co-authored-by: Subbu Subramaniam <[email protected]>
* Update
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
Co-authored-by: Subbu Subramaniam <[email protected]>
* Update
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/BestEffortSegmentUploader.java
Co-authored-by: Subbu Subramaniam <[email protected]>
* Revised based on comments.
* Change comments.
* Revised the splitcommiter.
* Update
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
Co-authored-by: Subbu Subramaniam <[email protected]>
* Further revise.
Co-authored-by: Subbu Subramaniam <[email protected]>
---
.../realtime/LLRealtimeSegmentDataManager.java | 2 +-
.../manager/realtime/PinotFSSegmentUploader.java | 86 ++++++++++
.../manager/realtime/SegmentCommitterFactory.java | 13 +-
.../data/manager/realtime/SegmentUploader.java | 3 +-
.../realtime/Server2ControllerSegmentUploader.java | 3 +-
.../manager/realtime/SplitSegmentCommitter.java | 20 +--
.../realtime/PinotFSSegmentUploaderTest.java | 187 +++++++++++++++++++++
.../Server2ControllerSegmentUploaderTest.java | 8 +-
.../DefaultCommitterRealtimeIntegrationTest.java | 6 +-
9 files changed, 296 insertions(+), 32 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index dcad570..033a220 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1231,7 +1231,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_consumeEndTime = now + minConsumeTimeMillis;
}
- _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger,
_indexLoadingConfig, _protocolHandler);
+ _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger,
_protocolHandler);
segmentLogger
.info("Starting consumption on realtime consuming segment {}
maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
new file mode 100644
index 0000000..5dae4e4
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A segment uploader which does segment upload to a segment store (with store
root dir configured as
+ * _segmentStoreUriStr) using PinotFS within a configurable timeout period.
The final segment location would be in the
+ * URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful.
+ */
+public class PinotFSSegmentUploader implements SegmentUploader {
+ private Logger LOGGER =
LoggerFactory.getLogger(PinotFSSegmentUploader.class);
+ private String _segmentStoreUriStr;
+ private ExecutorService _executorService = Executors.newCachedThreadPool();
+ private int _timeoutInMs;
+
+ public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis) {
+ _segmentStoreUriStr = segmentStoreDirUri;
+ _timeoutInMs = timeoutMillis;
+ }
+
+ public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
+ if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
+ return null;
+ }
+ Callable<URI> uploadTask = () -> {
+ URI destUri = new URI(StringUtil
+ .join(File.separator, _segmentStoreUriStr,
segmentName.getTableName(), segmentName.getSegmentName()));
+ try {
+ PinotFS pinotFS = PinotFSFactory.create(new
URI(_segmentStoreUriStr).getScheme());
+ // Check and delete any existing segment file.
+ if (pinotFS.exists(destUri)) {
+ pinotFS.delete(destUri, true);
+ }
+ pinotFS.copyFromLocalFile(segmentFile, destUri);
+ return destUri;
+ } catch (Exception e) {
+ LOGGER.warn("Failed copy segment tar file {} to segment store {}: {}",
segmentFile.getName(), destUri, e);
+ }
+ return null;
+ };
+ Future<URI> future = _executorService.submit(uploadTask);
+ try {
+ URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
+ return segmentLocation;
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted while waiting for segment upload of {} to {}.",
segmentName, _segmentStoreUriStr);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER
+ .warn("Failed to upload file {} of segment {} for table {} ",
segmentFile.getAbsolutePath(), segmentName, e);
+ }
+
+ return null;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 84e9c6e..df0f6b8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -19,7 +19,6 @@
package org.apache.pinot.core.data.manager.realtime;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.slf4j.Logger;
@@ -29,18 +28,16 @@ import org.slf4j.Logger;
*/
public class SegmentCommitterFactory {
private static Logger LOGGER;
- private final IndexLoadingConfig _indexLoadingConfig;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
- public SegmentCommitterFactory(Logger segmentLogger, IndexLoadingConfig
indexLoadingConfig,
- ServerSegmentCompletionProtocolHandler protocolHandler) {
+ public SegmentCommitterFactory(Logger segmentLogger,
ServerSegmentCompletionProtocolHandler protocolHandler) {
LOGGER = segmentLogger;
- _indexLoadingConfig = indexLoadingConfig;
_protocolHandler = protocolHandler;
}
-
- public SegmentCommitter
createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
SegmentUploader segmentUploader) {
- return new SplitSegmentCommitter(LOGGER, _protocolHandler,
_indexLoadingConfig, params, segmentUploader);
+
+ public SegmentCommitter
createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
+ SegmentUploader segmentUploader) {
+ return new SplitSegmentCommitter(LOGGER, _protocolHandler, params,
segmentUploader);
}
public SegmentCommitter
createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
index 7cefdee..44d0a36 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
@@ -20,9 +20,10 @@ package org.apache.pinot.core.data.manager.realtime;
import java.io.File;
import java.net.URI;
+import org.apache.pinot.common.utils.LLCSegmentName;
public interface SegmentUploader {
// Returns the URI of the uploaded segment. null if the upload fails.
- URI uploadSegment(File segmentFile);
+ URI uploadSegment(File segmentFile, LLCSegmentName segmentName);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 91b177d..35084aa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.util.SegmentCompletionProtocolUtils;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class Server2ControllerSegmentUploader implements
SegmentUploader {
}
@Override
- public URI uploadSegment(File segmentFile) {
+ public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
SegmentCompletionProtocol.Response response =
uploadSegmentToController(segmentFile);
if (response.getStatus() ==
SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
try {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 60938ef..53bc82d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.data.manager.realtime;
import java.io.File;
import java.net.URI;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.slf4j.Logger;
@@ -33,16 +33,13 @@ import org.slf4j.Logger;
public class SplitSegmentCommitter implements SegmentCommitter {
private final SegmentCompletionProtocol.Request.Params _params;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
- private final IndexLoadingConfig _indexLoadingConfig;
private final SegmentUploader _segmentUploader;
-
private final Logger _segmentLogger;
public SplitSegmentCommitter(Logger segmentLogger,
ServerSegmentCompletionProtocolHandler protocolHandler,
- IndexLoadingConfig indexLoadingConfig,
SegmentCompletionProtocol.Request.Params params, SegmentUploader
segmentUploader) {
+ SegmentCompletionProtocol.Request.Params params, SegmentUploader
segmentUploader) {
_segmentLogger = segmentLogger;
_protocolHandler = protocolHandler;
- _indexLoadingConfig = indexLoadingConfig;
_params = new SegmentCompletionProtocol.Request.Params(params);
_segmentUploader = segmentUploader;
}
@@ -58,19 +55,14 @@ public class SplitSegmentCommitter implements
SegmentCommitter {
return SegmentCompletionProtocol.RESP_FAILED;
}
- URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile);
+ URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new
LLCSegmentName(_params.getSegmentName()));
if (segmentLocation == null) {
- return SegmentCompletionProtocol.RESP_FAILED;
+ return SegmentCompletionProtocol.RESP_FAILED;
}
_params.withSegmentLocation(segmentLocation.toString());
- SegmentCompletionProtocol.Response commitEndResponse;
- if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
- commitEndResponse =
- _protocolHandler.segmentCommitEndWithMetadata(_params,
segmentBuildDescriptor.getMetadataFiles());
- } else {
- commitEndResponse = _protocolHandler.segmentCommitEnd(_params);
- }
+ SegmentCompletionProtocol.Response commitEndResponse =
+ _protocolHandler.segmentCommitEndWithMetadata(_params,
segmentBuildDescriptor.getMetadataFiles());
if
(!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
{
_segmentLogger.warn("CommitEnd failed with response {}",
commitEndResponse.toJsonString());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
new file mode 100644
index 0000000..7d1cd2d
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PinotFSSegmentUploaderTest {
+ private static final int TIMEOUT_IN_MS = 100;
+ private File _file;
+ private LLCSegmentName _llcSegmentName;
+ @BeforeClass
+ public void setUp()
+ throws URISyntaxException, IOException, HttpErrorStatusException {
+ Configuration fsConfig = new PropertiesConfiguration();
+ fsConfig.setProperty("class.hdfs",
"org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysSucceedPinotFS");
+ fsConfig.setProperty("class.timeout",
"org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysTimeoutPinotFS");
+ fsConfig.setProperty("class.existing",
"org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysExistPinotFS");
+ PinotFSFactory.init(fsConfig);
+ _file = FileUtils.getFile(FileUtils.getTempDirectory(),
UUID.randomUUID().toString());
+ _file.deleteOnExit();
+ _llcSegmentName = new LLCSegmentName("test_REALTIME", 1, 0,
System.currentTimeMillis());
+ }
+
+ @Test
+ public void testSuccessfulUpload() {
+ SegmentUploader segmentUploader = new
PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS);
+ URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+ Assert.assertEquals(segmentURI.toString(),
StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(),
_llcSegmentName.getSegmentName()));
+ }
+
+ @Test
+ public void testSegmentAlreadyExist() {
+ SegmentUploader segmentUploader = new
PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS);
+ URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+ Assert.assertEquals(segmentURI.toString(),
StringUtil.join(File.separator,"existing://root",
_llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()));
+ }
+
+ @Test
+ public void testUploadTimeOut() {
+ SegmentUploader segmentUploader = new
PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS);
+ URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+ Assert.assertNull(segmentURI);
+ }
+
+ @Test
+ public void testNoSegmentStoreConfigured() {
+ SegmentUploader segmentUploader = new PinotFSSegmentUploader("",
TIMEOUT_IN_MS);
+ URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+ Assert.assertNull(segmentURI);
+ }
+
+ public static class AlwaysSucceedPinotFS extends PinotFS {
+
+ @Override
+ public void init(Configuration config) {
+
+ }
+
+ @Override
+ public boolean mkdir(URI uri)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(URI segmentUri, boolean forceDelete)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean doMove(URI srcUri, URI dstUri)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean copy(URI srcUri, URI dstUri)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean exists(URI fileUri)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public long length(URI fileUri)
+ throws IOException {
+ return 0;
+ }
+
+ @Override
+ public String[] listFiles(URI fileUri, boolean recursive)
+ throws IOException {
+ return new String[0];
+ }
+
+ @Override
+ public void copyToLocalFile(URI srcUri, File dstFile)
+ throws Exception {
+ }
+
+ @Override
+ public void copyFromLocalFile(File srcFile, URI dstUri)
+ throws Exception {
+ }
+
+ @Override
+ public boolean isDirectory(URI uri)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public long lastModified(URI uri)
+ throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean touch(URI uri)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public InputStream open(URI uri)
+ throws IOException {
+ return null;
+ }
+ }
+
+ public static class AlwaysTimeoutPinotFS extends AlwaysSucceedPinotFS {
+ @Override
+ public void copyFromLocalFile(File srcFile, URI dstUri)
+ throws Exception {
+ // Make sure the sleep time > the timeout threshold of uploader.
+ Thread.sleep(TIMEOUT_IN_MS * 1000);
+ }
+ }
+
+ public static class AlwaysExistPinotFS extends AlwaysSucceedPinotFS {
+ @Override
+ public boolean exists(URI fileUri)
+ throws IOException {
+ return true;
+ }
+ }
+
+}
+
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
index 4bad294..85f84f1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
@@ -28,6 +28,7 @@ import
org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class Server2ControllerSegmentUploaderTest {
private static Logger _logger =
LoggerFactory.getLogger(Server2ControllerSegmentUploaderTest.class);
private FileUploadDownloadClient _fileUploadDownloadClient;
private File _file;
+ private LLCSegmentName _llcSegmentName;
@BeforeClass
public void setUp()
@@ -75,6 +77,8 @@ public class Server2ControllerSegmentUploaderTest {
_file = FileUtils.getFile(FileUtils.getTempDirectory(),
UUID.randomUUID().toString());
_file.deleteOnExit();
+
+ _llcSegmentName = new LLCSegmentName("test_REALTIME", 1, 0,
System.currentTimeMillis());
}
@AfterClass
@@ -88,7 +92,7 @@ public class Server2ControllerSegmentUploaderTest {
Server2ControllerSegmentUploader uploader =
new Server2ControllerSegmentUploader(_logger,
_fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName",
10000, mock(ServerMetrics.class));
- URI segmentURI = uploader.uploadSegment(_file);
+ URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION);
}
@@ -98,7 +102,7 @@ public class Server2ControllerSegmentUploaderTest {
Server2ControllerSegmentUploader uploader =
new Server2ControllerSegmentUploader(_logger,
_fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName",
10000, mock(ServerMetrics.class));
- URI segmentURI = uploader.uploadSegment(_file);
+ URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
Assert.assertNull(segmentURI);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
index 30c8d4b..adcb014 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
@@ -33,7 +33,6 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentCommitter;
import org.apache.pinot.core.data.manager.realtime.SegmentCommitterFactory;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.data.readers.PinotSegmentUtil;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -96,12 +95,10 @@ public class DefaultCommitterRealtimeIntegrationTest
extends RealtimeClusterInte
@Test
public void testDefaultCommitter()
throws Exception {
- IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
ServerMetrics serverMetrics = new ServerMetrics(new MetricsRegistry());
ServerSegmentCompletionProtocolHandler protocolHandler =
new ServerSegmentCompletionProtocolHandler(serverMetrics,
getTableName());
- SegmentCompletionProtocol.Response prevResponse = new
SegmentCompletionProtocol.Response();
LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor
=
mock(LLRealtimeSegmentDataManager.SegmentBuildDescriptor.class);
@@ -134,8 +131,7 @@ public class DefaultCommitterRealtimeIntegrationTest
extends RealtimeClusterInte
sendGetRequest("http://localhost:" + DEFAULT_CONTROLLER_PORT +
"/segmentConsumed?instance=" + instanceId + "&name="
+ segmentName + "&offset=" + END_OFFSET);
- SegmentCommitterFactory segmentCommitterFactory =
- new SegmentCommitterFactory(LOGGER, indexLoadingConfig,
protocolHandler);
+ SegmentCommitterFactory segmentCommitterFactory = new
SegmentCommitterFactory(LOGGER, protocolHandler);
SegmentCommitter segmentCommitter =
segmentCommitterFactory.createDefaultSegmentCommitter(params);
segmentCommitter.commit(END_OFFSET, 3, segmentBuildDescriptor);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]