This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch testing in repository https://gitbox.apache.org/repos/asf/pinot.git
commit e74f077fb551d368b494fa0931dde2379a406100 Author: jlli_LinkedIn <[email protected]> AuthorDate: Mon Jul 21 10:31:03 2025 -0700 Testing with S3 APIs --- pinot-common/pom.xml | 5 + .../common/utils/fetcher/LiSegmentFetcher.java | 106 ++++++ .../common/utils/filesystem/LiLocalPinotFS.java | 363 +++++++++++++++++++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 9 +- .../pinot/controller/helix/ControllerTest.java | 12 + .../tests/BaseClusterIntegrationTest.java | 2 +- .../tests/ClusterIntegrationTestUtils.java | 2 +- .../pinot/integration/tests/ClusterTest.java | 40 ++- .../tests/OfflineClusterIntegrationTest.java | 2 +- .../pinot/spi/filesystem/PinotFSFactory.java | 1 + 10 files changed, 523 insertions(+), 19 deletions(-) diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml index dac182cd9e..1160117b61 100644 --- a/pinot-common/pom.xml +++ b/pinot-common/pom.xml @@ -117,6 +117,11 @@ </plugins> </build> <dependencies> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <version>2.31.78</version> + </dependency> <dependency> <groupId>io.prometheus.jmx</groupId> <artifactId>jmx_prometheus_javaagent</artifactId> diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/LiSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/LiSegmentFetcher.java new file mode 100644 index 0000000000..86749dfbd3 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/LiSegmentFetcher.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.fetcher; + +import java.io.File; +import java.net.URI; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.pinot.spi.env.PinotConfiguration; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + + +public class LiSegmentFetcher implements SegmentFetcher { + + private final static String MINIO_ENDPOINT = "http://127.0.0.1:9000"; + private final static String ACCESS_KEY = "minioadmin"; + private final static String SECRECT_KEY = "minioadmin"; + private final static String BUCKET_NAME = "pinot-bucket"; + private final static Pattern SEGMENT_PATTERN_IN_HTTP_URL = Pattern.compile("^http?://[^/]+/segments/(.+)"); + // Pattern.compile("^http://[^/]+/segments/([^/]+/[^/]+)$"); + + private S3Client _s3Client; + + @Override + public void init(PinotConfiguration config) { + AwsBasicCredentials creds = AwsBasicCredentials.create(ACCESS_KEY, SECRECT_KEY); + _s3Client = S3Client.builder().endpointOverride(URI.create(MINIO_ENDPOINT)) + .region(Region.US_EAST_1) // any region is okay for local + .credentialsProvider(StaticCredentialsProvider.create(creds)).build(); + } + + @Override + public void fetchSegmentToLocal(URI uri, File dest) + throws Exception { + String segmentKey = extractSegmentKey(uri); + _s3Client.getObject(b -> b.bucket(BUCKET_NAME).key(segmentKey), dest.toPath()); + } + + private String extractSegmentKey(URI uri) { + String protocol = uri.getScheme(); + switch (protocol) { + case "http": + return extractSegmentKeyFromHttpUrl(uri.toString()); + case "file": + return extractSegmentKeyFromFileUrl(uri.getPath()); + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + private String extractSegmentKeyFromHttpUrl(String uri) { + // url example: http://localhost:20000/segments/mytable/mytable_16071_16101_3 + Matcher matcher = SEGMENT_PATTERN_IN_HTTP_URL.matcher(uri); + if (matcher.find()) { + return matcher.group(1) + ".gz"; + } + throw new RuntimeException("Failed to extract segment key from download uri: " + uri); + } + + private String extractSegmentKeyFromFileUrl(String path) { + // not needed, only for integration test + String segmentName = path.substring(path.lastIndexOf("/") + 1, path.indexOf(".tar.gz")); + String tableName = segmentName.substring(0, segmentName.indexOf("_")); + return tableName + "/" + segmentName; + } + + @Override + public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit, AtomicInteger attempts) + throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void fetchSegmentToLocal(List<URI> uri, File dest) + throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> uriSupplier, File dest) + throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/filesystem/LiLocalPinotFS.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/filesystem/LiLocalPinotFS.java new file mode 100644 index 0000000000..a1e75c3879 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/filesystem/LiLocalPinotFS.java @@ -0,0 +1,363 @@ +package org.apache.pinot.common.utils.filesystem; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.LocalPinotFS; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.CopyObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.MetadataDirective; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + + +public class LiLocalPinotFS extends LocalPinotFS { + + public final static String HTTP_SCHEME = "http"; + public final static String FILE_SCHEME = "file"; + private static final String DELIMITER = "/"; + private final static String MINIO_ENDPOINT = "http://127.0.0.1:9000"; + private final static String ACCESS_KEY = "minioadmin"; + private final static String SECRECT_KEY = "minioadmin"; + private final static String BUCKET_NAME = "pinot-bucket"; + private final static Pattern SEGMENT_PATTERN_IN_HTTP_URL = Pattern.compile("^http?://[^/]+/segments/(.+)"); + // Pattern.compile("^http://[^/]+/segments/([^/]+/[^/]+)$"); + + private S3Client _s3Client; + + @Override + public void init(PinotConfiguration configuration) { + AwsBasicCredentials creds = AwsBasicCredentials.create(ACCESS_KEY, SECRECT_KEY); + _s3Client = S3Client.builder().endpointOverride(URI.create(MINIO_ENDPOINT)) + .region(Region.US_EAST_1) // any region is okay for local + .credentialsProvider(StaticCredentialsProvider.create(creds)).build(); + } + + @Override + public boolean mkdir(URI uri) + throws IOException { + String protocol = uri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + return super.mkdir(uri); + case HTTP_SCHEME: + String segmentKey = extractSegmentKey(uri); + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(BUCKET_NAME) + .key(segmentKey) + .build(); + PutObjectResponse putObjectResponse = _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0])); + return putObjectResponse.sdkHttpResponse().isSuccessful(); + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + @Override + public boolean delete(URI segmentUri, boolean forceDelete) + throws IOException { + String protocol = segmentUri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + return super.delete(segmentUri, forceDelete); + case HTTP_SCHEME: + return true; +// String segmentKey = extractSegmentKey(segmentUri); +// try { +// if (isDirectory(segmentUri)) { +// if (!forceDelete) { +//// Preconditions.checkState(isEmptyDirectory(segmentUri), +//// "ForceDelete flag is not set and directory '%s' is not empty", segmentUri); +// } +// String prefix = normalizeToDirectoryPrefix(segmentUri); +// ListObjectsV2Response listObjectsV2Response; +// ListObjectsV2Request.Builder listObjectsV2RequestBuilder = +// ListObjectsV2Request.builder().bucket(segmentUri.getHost()); +// +// if (prefix.equals(DELIMITER)) { +// ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build(); +// listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request); +// } else { +// ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.prefix(prefix).build(); +// listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request); +// } +// boolean deleteSucceeded = true; +// for (S3Object s3Object : listObjectsV2Response.contents()) { +// DeleteObjectRequest deleteObjectRequest = +// DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(s3Object.key()).build(); +// +// DeleteObjectResponse deleteObjectResponse = _s3Client.deleteObject(deleteObjectRequest); +// +// deleteSucceeded &= deleteObjectResponse.sdkHttpResponse().isSuccessful(); +// } +// return deleteSucceeded; +// } else { +// String prefix = sanitizePath(segmentUri.getPath()); +// DeleteObjectRequest deleteObjectRequest = +// DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(prefix).build(); +// +// DeleteObjectResponse deleteObjectResponse = _s3Client.deleteObject(deleteObjectRequest); +// +// return deleteObjectResponse.sdkHttpResponse().isSuccessful(); +// } +// } catch (NoSuchKeyException e) { +// return false; +// } catch (S3Exception e) { +// throw e; +// } catch (Exception e) { +// throw new IOException(e); +// } + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + @Override + public boolean exists(URI fileUri) { + String protocol = fileUri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + return super.exists(fileUri); + case HTTP_SCHEME: + String segmentKey = extractSegmentKey(fileUri); + try { + return _s3Client.headObject(b -> b.bucket(BUCKET_NAME).key(segmentKey)).sdkHttpResponse().isSuccessful(); + } catch (NoSuchKeyException e) { + return false; + } + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + @Override + public void copyFromLocalFile(File srcFile, URI dstUri) + throws Exception { + String protocol = dstUri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + super.copyFromLocalFile(srcFile, dstUri); + break; + case HTTP_SCHEME: + String segmentKey = extractSegmentKey(dstUri); + PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(BUCKET_NAME).key(segmentKey).build(); + _s3Client.putObject(putObjectRequest, RequestBody.fromFile(srcFile)); + break; + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + @Override + public long length(URI fileUri) { + String protocol = fileUri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + return super.length(fileUri); + case HTTP_SCHEME: + String segmentKey = extractSegmentKey(fileUri); + HeadObjectResponse headObjectResponse = _s3Client.headObject(b -> b.bucket(BUCKET_NAME).key(segmentKey)); + return headObjectResponse.contentLength(); + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + @Override + public String[] listFiles(URI fileUri, boolean recursive) + throws IOException { + String protocol = fileUri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + return super.listFiles(fileUri, recursive); + case HTTP_SCHEME: + String segmentKey = extractSegmentKey(fileUri); + List<S3Object> s3ObjectList = _s3Client.listObjectsV2(builder -> builder.bucket(BUCKET_NAME).prefix(segmentKey)).contents(); + List<String> files = new ArrayList<>(); + for (S3Object s3Object : s3ObjectList) { + files.add(s3Object.key()); + } + return files.toArray(new String[0]); + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + @Override + public boolean doMove(URI srcUri, URI dstUri) + throws IOException { + String protocol = srcUri.getScheme(); + switch (protocol) { + case FILE_SCHEME: + return super.doMove(srcUri, dstUri); + case HTTP_SCHEME: +// LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri); + Preconditions.checkState(exists(srcUri), "Source URI '%s' does not exist", srcUri); + if (srcUri.equals(dstUri)) { + return true; + } + if (!isDirectory(srcUri)) { + delete(dstUri, true); + return copyFile(srcUri, dstUri); + } + dstUri = normalizeToDirectoryUri(dstUri); + Path srcPath = Paths.get(srcUri.getPath()); + try { + boolean copySucceeded = true; + for (String filePath : listFiles(srcUri, true)) { + URI srcFileURI = URI.create(filePath); + String directoryEntryPrefix = srcFileURI.getPath(); + URI src = new URI(srcUri.getScheme(), srcUri.getHost(), directoryEntryPrefix, null); + String relativeSrcPath = srcPath.relativize(Paths.get(directoryEntryPrefix)).toString(); + String dstPath = dstUri.resolve(relativeSrcPath).getPath(); + URI dst = new URI(dstUri.getScheme(), dstUri.getHost(), dstPath, null); + copySucceeded &= copyFile(src, dst); + } + return copySucceeded; + } catch (URISyntaxException e) { + throw new IOException(e); + } + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + private String extractSegmentKey(URI uri) { + String protocol = uri.getScheme(); + switch (protocol) { + case "http": + return extractSegmentKeyFromHttpUrl(uri.toString()); + case "file": + return extractSegmentKeyFromFileUrl(uri.getPath()); + default: + throw new UnsupportedOperationException("Unsupported protocol: " + protocol); + } + } + + private String extractSegmentKeyFromHttpUrl(String uri) { + // url example: http://localhost:20000/segments/mytable/mytable_16071_16101_3 + Matcher matcher = SEGMENT_PATTERN_IN_HTTP_URL.matcher(uri); + if (matcher.find()) { + String matchedString = matcher.group(1); + if (matchedString.contains("/")) { + return matchedString + ".gz"; + } else { + return matchedString; + } +// return matcher.group(1) + ".gz"; + } + throw new RuntimeException("Failed to extract segment key from download uri: " + uri); + } + + private String extractSegmentKeyFromFileUrl(String path) { + // not needed, only for integration test + String segmentName = path.substring(path.lastIndexOf("/") + 1, path.indexOf(".tar.gz")); + String tableName = segmentName.substring(0, segmentName.indexOf("_")); + return tableName + "/" + segmentName; + } + + private boolean isPathTerminatedByDelimiter(URI uri) { + return uri.getPath().endsWith(DELIMITER); + } + + private URI normalizeToDirectoryUri(URI uri) + throws IOException { + if (isPathTerminatedByDelimiter(uri)) { + return uri; + } + try { + return new URI(uri.getScheme(), uri.getHost(), sanitizePath(uri.getPath() + DELIMITER), null); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + private boolean copyFile(URI srcUri, URI dstUri) + throws IOException { + try { + String srcKey = extractSegmentKey(srcUri); + String dstKey = extractSegmentKey(dstUri); + CopyObjectResponse copyObjectResponse = _s3Client.copyObject( + builder -> builder.sourceBucket(BUCKET_NAME).sourceKey(srcKey).destinationBucket(BUCKET_NAME) + .destinationKey(dstKey)); + return copyObjectResponse.sdkHttpResponse().isSuccessful(); +// String encodedUrl = URLEncoder.encode(srcUri.getHost() + srcUri.getPath(), StandardCharsets.UTF_8); +// +// String dstPath = sanitizePath(dstUri.getPath()); +// CopyObjectRequest copyReq = generateCopyObjectRequest(encodedUrl, dstUri, dstPath, null); +// CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq); +// return copyObjectResponse.sdkHttpResponse().isSuccessful(); + } catch (S3Exception e) { + throw new IOException(e); + } + } + + private String sanitizePath(String path) { + path = path.replaceAll(DELIMITER + "+", DELIMITER); + if (path.startsWith(DELIMITER) && !path.equals(DELIMITER)) { + path = path.substring(1); + } + return path; + } + + private CopyObjectRequest generateCopyObjectRequest(String copySource, URI dest, String path, + Map<String, String> metadata) { + CopyObjectRequest.Builder copyReqBuilder = + CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path); +// if (_storageClass != null) { +// copyReqBuilder.storageClass(_storageClass); +// } + if (metadata != null) { + copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE); + } +// if (!_disableAcl) { + copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL); +// } +// if (_serverSideEncryption != null) { +// copyReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId); +// if (_ssekmsEncryptionContext != null) { +// copyReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext); +// } +// } + return copyReqBuilder.build(); + } + + private String normalizeToDirectoryPrefix(URI uri) + throws IOException { + Preconditions.checkNotNull(uri, "uri is null"); + URI strippedUri = getBase(uri).relativize(uri); + if (isPathTerminatedByDelimiter(strippedUri)) { + return sanitizePath(strippedUri.getPath()); + } + return sanitizePath(strippedUri.getPath() + DELIMITER); + } + + private URI getBase(URI uri) + throws IOException { + try { + return new URI(uri.getScheme(), uri.getHost(), null, null); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 5f7c950178..d49b4b475a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -583,8 +583,10 @@ public class PinotLLCRealtimeSegmentManager { return; } - URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); - PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); + URI tableDirURI = URIUtils.getUri("http://localhost:20000/segments", rawTableName); + PinotFS pinotFS = PinotFSFactory.create("http"); +// URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); +// PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, segmentLocation, pinotFS); if (!isTmpSegmentAsyncDeletionEnabled()) { @@ -2415,7 +2417,8 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting URI createSegmentPath(String rawTableName, String segmentName) { - return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); + return URIUtils.getUri("http://localhost:20000/segments", rawTableName, URIUtils.encode(segmentName)); + //return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index e3225dea09..0ad49cf1d1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -280,6 +280,18 @@ public class ControllerTest { properties.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false); properties.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC"); properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, true); + +// properties.put("pinot.controller.storage.factory.class.s3", "org.apache.pinot.plugin.filesystem.S3PinotFS"); +// properties.put("controller.data.dir", "s3://127.0.0.1:9000/pinot-bucket"); +// properties.put("pinot.controller.storage.factory.s3.region", "us-east-1"); +// properties.put("pinot.controller.segment.fetcher.protocols", "file,http,s3"); +// properties.put("pinot.controller.segment.fetcher.s3.class", "org.apache.pinot.common.utils.fetcher.LiSegmentFetcher"); +// properties.put("pinot.controller.storage.factory.s3.accessKey", "minioadmin"); +// properties.put("pinot.controller.storage.factory.s3.secretKey", "minioadmin"); + + properties.put("pinot.controller.storage.factory.class.http", "org.apache.pinot.common.utils.filesystem.LiLocalPinotFS"); + + overrideControllerConf(properties); return properties; } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 21f3dc3c6b..ddd8670c16 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -166,7 +166,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { } protected int getNumKafkaPartitions() { - return DEFAULT_LLC_NUM_KAFKA_PARTITIONS; + return DEFAULT_LLC_NUM_KAFKA_PARTITIONS; //1; //DEFAULT_LLC_NUM_KAFKA_PARTITIONS; } protected String getKafkaTopic() { diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index acf0b78d50..1cd735b2e3 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -327,7 +327,7 @@ public class ClusterIntegrationTestUtils { org.apache.pinot.spi.data.Schema schema, int segmentIndex, File segmentDir, File tarDir) throws Exception { // Test segment with space and special character in the file name - buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + " %", segmentDir, tarDir); + buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + "", segmentDir, tarDir); } /** diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 4e0366af87..de09cfcf6a 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -79,6 +79,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Server; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.NetUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.intellij.lang.annotations.Language; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -242,6 +243,17 @@ public abstract class ClusterTest extends ControllerTest { protected PinotConfiguration getServerConf(int serverId) { PinotConfiguration serverConf = new PinotConfiguration(); + serverConf.setProperty("pinot.server.segment.fetcher.protocols", List.of("file", "http")); + serverConf.setProperty("pinot.server.segment.fetcher.file.class", + "org.apache.pinot.common.utils.fetcher.LiSegmentFetcher"); + serverConf.setProperty("pinot.server.segment.fetcher.http.class", + "org.apache.pinot.common.utils.fetcher.LiSegmentFetcher"); + serverConf.setProperty("pinot.server.storage.factory.class.http", "org.apache.pinot.common.utils.filesystem.LiLocalPinotFS"); + + // needed for realtime uploads + serverConf.setProperty("pinot.server.instance.segment.upload.to.deep.store", true); + serverConf.setProperty("pinot.server.instance.segment.store.uri", "http://localhost:20000/segments"); + serverConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl()); serverConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName()); serverConf.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, LOCAL_HOST); @@ -458,15 +470,9 @@ public abstract class ClusterTest extends ControllerTest { .availableProcessors())); List<Future<Integer>> futures = new ArrayList<>(numSegments); for (File segmentTarFile : segmentTarFiles) { - futures.add(executorService.submit(() -> { - if (System.currentTimeMillis() % 2 == 0) { - return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), - segmentTarFile, getSegmentUploadAuthHeaders(), tableName, tableType).getStatusCode(); - } else { - return uploadSegmentWithOnlyMetadata(tableName, tableType, uploadSegmentHttpURI, fileUploadDownloadClient, - segmentTarFile); - } - })); + futures.add(executorService.submit( + () -> uploadSegmentWithOnlyMetadata(tableName, tableType, uploadSegmentHttpURI, fileUploadDownloadClient, + segmentTarFile))); } executorService.shutdown(); for (Future<Integer> future : futures) { @@ -479,11 +485,19 @@ public abstract class ClusterTest extends ControllerTest { private int uploadSegmentWithOnlyMetadata(String tableName, TableType tableType, URI uploadSegmentHttpURI, FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile) throws IOException, HttpErrorStatusException { + String controllerSegmentsEndpoint = "http://localhost:20000/segments"; + String fileName = URIUtils.encode(segmentTarFile.getName()); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String segmentName = fileName.substring(0, fileName.indexOf(".tar.gz")); List<Header> headers = new ArrayList<>(List.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, - "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" - + URIUtils.encode(segmentTarFile.getName())), - new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, - FileUploadDownloadClient.FileUploadType.METADATA.toString()))); + controllerSegmentsEndpoint + "/" + rawTableName + "/" + segmentName), + new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString()))); +// List<Header> headers = new ArrayList<>(List.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, +// String.format("file://%s/%s", segmentTarFile.getParentFile().getAbsolutePath(), +// URIUtils.encode(segmentTarFile.getName()))), +// new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, +// FileUploadDownloadClient.FileUploadType.METADATA.toString()))); headers.addAll(getSegmentUploadAuthHeaders()); // Add table name and table type as request parameters NameValuePair tableNameValuePair = diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 36af424482..8664c19263 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -257,7 +257,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet List<File> tarDirs = new ArrayList<>(); tarDirs.add(_tarDir); - tarDirs.add(tarDir2); +// tarDirs.add(tarDir2); try { uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); } catch (Exception e) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java index eafba8a256..ee24387462 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java @@ -42,6 +42,7 @@ public class PinotFSFactory { private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() { { put(LOCAL_PINOT_FS_SCHEME, new NoClosePinotFS(new LocalPinotFS())); +// put("http", new NoClosePinotFS(new LiLocalPinotFS())); } }; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
