This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch testing
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit e74f077fb551d368b494fa0931dde2379a406100
Author: jlli_LinkedIn <[email protected]>
AuthorDate: Mon Jul 21 10:31:03 2025 -0700

    Testing with S3 APIs
---
 pinot-common/pom.xml                               |   5 +
 .../common/utils/fetcher/LiSegmentFetcher.java     | 106 ++++++
 .../common/utils/filesystem/LiLocalPinotFS.java    | 363 +++++++++++++++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   9 +-
 .../pinot/controller/helix/ControllerTest.java     |  12 +
 .../tests/BaseClusterIntegrationTest.java          |   2 +-
 .../tests/ClusterIntegrationTestUtils.java         |   2 +-
 .../pinot/integration/tests/ClusterTest.java       |  40 ++-
 .../tests/OfflineClusterIntegrationTest.java       |   2 +-
 .../pinot/spi/filesystem/PinotFSFactory.java       |   1 +
 10 files changed, 523 insertions(+), 19 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index dac182cd9e..1160117b61 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -117,6 +117,11 @@
     </plugins>
   </build>
   <dependencies>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>s3</artifactId>
+      <version>2.31.78</version>
+    </dependency>
     <dependency>
       <groupId>io.prometheus.jmx</groupId>
       <artifactId>jmx_prometheus_javaagent</artifactId>
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/LiSegmentFetcher.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/LiSegmentFetcher.java
new file mode 100644
index 0000000000..86749dfbd3
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/LiSegmentFetcher.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.fetcher;
+
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+
+public class LiSegmentFetcher implements SegmentFetcher {
+
+  private final static String MINIO_ENDPOINT = "http://127.0.0.1:9000";;
+  private final static String ACCESS_KEY = "minioadmin";
+  private final static String SECRECT_KEY = "minioadmin";
+  private final static String BUCKET_NAME = "pinot-bucket";
+  private final static Pattern SEGMENT_PATTERN_IN_HTTP_URL = 
Pattern.compile("^http?://[^/]+/segments/(.+)");
+  // Pattern.compile("^http://[^/]+/segments/([^/]+/[^/]+)$");
+
+  private S3Client _s3Client;
+
+  @Override
+  public void init(PinotConfiguration config) {
+    AwsBasicCredentials creds = AwsBasicCredentials.create(ACCESS_KEY, 
SECRECT_KEY);
+    _s3Client = S3Client.builder().endpointOverride(URI.create(MINIO_ENDPOINT))
+        .region(Region.US_EAST_1) // any region is okay for local
+        .credentialsProvider(StaticCredentialsProvider.create(creds)).build();
+  }
+
+  @Override
+  public void fetchSegmentToLocal(URI uri, File dest)
+      throws Exception {
+    String segmentKey = extractSegmentKey(uri);
+    _s3Client.getObject(b -> b.bucket(BUCKET_NAME).key(segmentKey), 
dest.toPath());
+  }
+
+  private String extractSegmentKey(URI uri) {
+    String protocol = uri.getScheme();
+    switch (protocol) {
+      case "http":
+        return extractSegmentKeyFromHttpUrl(uri.toString());
+      case "file":
+        return extractSegmentKeyFromFileUrl(uri.getPath());
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  private String extractSegmentKeyFromHttpUrl(String uri) {
+    // url example: 
http://localhost:20000/segments/mytable/mytable_16071_16101_3
+    Matcher matcher = SEGMENT_PATTERN_IN_HTTP_URL.matcher(uri);
+    if (matcher.find()) {
+      return matcher.group(1) + ".gz";
+    }
+    throw new RuntimeException("Failed to extract segment key from download 
uri: " + uri);
+  }
+
+  private String extractSegmentKeyFromFileUrl(String path) {
+    // not needed, only for integration test
+    String segmentName = path.substring(path.lastIndexOf("/") + 1, 
path.indexOf(".tar.gz"));
+    String tableName = segmentName.substring(0, segmentName.indexOf("_"));
+    return tableName + "/" + segmentName;
+  }
+
+  @Override
+  public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long 
rateLimit, AtomicInteger attempts)
+      throws Exception {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public void fetchSegmentToLocal(List<URI> uri, File dest)
+      throws Exception {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public void fetchSegmentToLocal(String segmentName, Supplier<List<URI>> 
uriSupplier, File dest)
+      throws Exception {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/filesystem/LiLocalPinotFS.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/filesystem/LiLocalPinotFS.java
new file mode 100644
index 0000000000..a1e75c3879
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/filesystem/LiLocalPinotFS.java
@@ -0,0 +1,363 @@
+package org.apache.pinot.common.utils.filesystem;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.MetadataDirective;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+
+public class LiLocalPinotFS extends LocalPinotFS {
+
+  public final static String HTTP_SCHEME = "http";
+  public final static String FILE_SCHEME = "file";
+  private static final String DELIMITER = "/";
+  private final static String MINIO_ENDPOINT = "http://127.0.0.1:9000";;
+  private final static String ACCESS_KEY = "minioadmin";
+  private final static String SECRECT_KEY = "minioadmin";
+  private final static String BUCKET_NAME = "pinot-bucket";
+  private final static Pattern SEGMENT_PATTERN_IN_HTTP_URL = 
Pattern.compile("^http?://[^/]+/segments/(.+)");
+  // Pattern.compile("^http://[^/]+/segments/([^/]+/[^/]+)$");
+
+  private S3Client _s3Client;
+
+  @Override
+  public void init(PinotConfiguration configuration) {
+    AwsBasicCredentials creds = AwsBasicCredentials.create(ACCESS_KEY, 
SECRECT_KEY);
+    _s3Client = S3Client.builder().endpointOverride(URI.create(MINIO_ENDPOINT))
+        .region(Region.US_EAST_1) // any region is okay for local
+        .credentialsProvider(StaticCredentialsProvider.create(creds)).build();
+  }
+
+  @Override
+  public boolean mkdir(URI uri)
+      throws IOException {
+    String protocol = uri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        return super.mkdir(uri);
+      case HTTP_SCHEME:
+        String segmentKey = extractSegmentKey(uri);
+        PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+            .bucket(BUCKET_NAME)
+            .key(segmentKey)
+            .build();
+        PutObjectResponse putObjectResponse = 
_s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
+        return putObjectResponse.sdkHttpResponse().isSuccessful();
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  @Override
+  public boolean delete(URI segmentUri, boolean forceDelete)
+      throws IOException {
+    String protocol = segmentUri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        return super.delete(segmentUri, forceDelete);
+      case HTTP_SCHEME:
+        return true;
+//        String segmentKey = extractSegmentKey(segmentUri);
+//        try {
+//          if (isDirectory(segmentUri)) {
+//            if (!forceDelete) {
+////              Preconditions.checkState(isEmptyDirectory(segmentUri),
+////                  "ForceDelete flag is not set and directory '%s' is not 
empty", segmentUri);
+//            }
+//            String prefix = normalizeToDirectoryPrefix(segmentUri);
+//            ListObjectsV2Response listObjectsV2Response;
+//            ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
+//                ListObjectsV2Request.builder().bucket(segmentUri.getHost());
+//
+//            if (prefix.equals(DELIMITER)) {
+//              ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
+//              listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+//            } else {
+//              ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.prefix(prefix).build();
+//              listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+//            }
+//            boolean deleteSucceeded = true;
+//            for (S3Object s3Object : listObjectsV2Response.contents()) {
+//              DeleteObjectRequest deleteObjectRequest =
+//                  
DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(s3Object.key()).build();
+//
+//              DeleteObjectResponse deleteObjectResponse = 
_s3Client.deleteObject(deleteObjectRequest);
+//
+//              deleteSucceeded &= 
deleteObjectResponse.sdkHttpResponse().isSuccessful();
+//            }
+//            return deleteSucceeded;
+//          } else {
+//            String prefix = sanitizePath(segmentUri.getPath());
+//            DeleteObjectRequest deleteObjectRequest =
+//                
DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(prefix).build();
+//
+//            DeleteObjectResponse deleteObjectResponse = 
_s3Client.deleteObject(deleteObjectRequest);
+//
+//            return deleteObjectResponse.sdkHttpResponse().isSuccessful();
+//          }
+//        } catch (NoSuchKeyException e) {
+//          return false;
+//        } catch (S3Exception e) {
+//          throw e;
+//        } catch (Exception e) {
+//          throw new IOException(e);
+//        }
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  @Override
+  public boolean exists(URI fileUri) {
+    String protocol = fileUri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        return super.exists(fileUri);
+      case HTTP_SCHEME:
+        String segmentKey = extractSegmentKey(fileUri);
+        try {
+          return _s3Client.headObject(b -> 
b.bucket(BUCKET_NAME).key(segmentKey)).sdkHttpResponse().isSuccessful();
+        } catch (NoSuchKeyException e) {
+          return false;
+        }
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  @Override
+  public void copyFromLocalFile(File srcFile, URI dstUri)
+      throws Exception {
+    String protocol = dstUri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        super.copyFromLocalFile(srcFile, dstUri);
+        break;
+      case HTTP_SCHEME:
+        String segmentKey = extractSegmentKey(dstUri);
+        PutObjectRequest putObjectRequest = 
PutObjectRequest.builder().bucket(BUCKET_NAME).key(segmentKey).build();
+        _s3Client.putObject(putObjectRequest, RequestBody.fromFile(srcFile));
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  @Override
+  public long length(URI fileUri) {
+    String protocol = fileUri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        return super.length(fileUri);
+      case HTTP_SCHEME:
+        String segmentKey = extractSegmentKey(fileUri);
+        HeadObjectResponse headObjectResponse = _s3Client.headObject(b -> 
b.bucket(BUCKET_NAME).key(segmentKey));
+        return headObjectResponse.contentLength();
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  @Override
+  public String[] listFiles(URI fileUri, boolean recursive)
+      throws IOException {
+    String protocol = fileUri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        return super.listFiles(fileUri, recursive);
+      case HTTP_SCHEME:
+        String segmentKey = extractSegmentKey(fileUri);
+        List<S3Object> s3ObjectList = _s3Client.listObjectsV2(builder -> 
builder.bucket(BUCKET_NAME).prefix(segmentKey)).contents();
+        List<String> files = new ArrayList<>();
+        for (S3Object s3Object : s3ObjectList) {
+          files.add(s3Object.key());
+        }
+        return files.toArray(new String[0]);
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  @Override
+  public boolean doMove(URI srcUri, URI dstUri)
+      throws IOException {
+    String protocol = srcUri.getScheme();
+    switch (protocol) {
+      case FILE_SCHEME:
+        return super.doMove(srcUri, dstUri);
+      case HTTP_SCHEME:
+//        LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri);
+        Preconditions.checkState(exists(srcUri), "Source URI '%s' does not 
exist", srcUri);
+        if (srcUri.equals(dstUri)) {
+          return true;
+        }
+        if (!isDirectory(srcUri)) {
+          delete(dstUri, true);
+          return copyFile(srcUri, dstUri);
+        }
+        dstUri = normalizeToDirectoryUri(dstUri);
+        Path srcPath = Paths.get(srcUri.getPath());
+        try {
+          boolean copySucceeded = true;
+          for (String filePath : listFiles(srcUri, true)) {
+            URI srcFileURI = URI.create(filePath);
+            String directoryEntryPrefix = srcFileURI.getPath();
+            URI src = new URI(srcUri.getScheme(), srcUri.getHost(), 
directoryEntryPrefix, null);
+            String relativeSrcPath = 
srcPath.relativize(Paths.get(directoryEntryPrefix)).toString();
+            String dstPath = dstUri.resolve(relativeSrcPath).getPath();
+            URI dst = new URI(dstUri.getScheme(), dstUri.getHost(), dstPath, 
null);
+            copySucceeded &= copyFile(src, dst);
+          }
+          return copySucceeded;
+        } catch (URISyntaxException e) {
+          throw new IOException(e);
+        }
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  private String extractSegmentKey(URI uri) {
+    String protocol = uri.getScheme();
+    switch (protocol) {
+      case "http":
+        return extractSegmentKeyFromHttpUrl(uri.toString());
+      case "file":
+        return extractSegmentKeyFromFileUrl(uri.getPath());
+      default:
+        throw new UnsupportedOperationException("Unsupported protocol: " + 
protocol);
+    }
+  }
+
+  private String extractSegmentKeyFromHttpUrl(String uri) {
+    // url example: 
http://localhost:20000/segments/mytable/mytable_16071_16101_3
+    Matcher matcher = SEGMENT_PATTERN_IN_HTTP_URL.matcher(uri);
+    if (matcher.find()) {
+      String matchedString = matcher.group(1);
+      if (matchedString.contains("/")) {
+        return matchedString + ".gz";
+      } else {
+        return matchedString;
+      }
+//      return matcher.group(1) + ".gz";
+    }
+    throw new RuntimeException("Failed to extract segment key from download 
uri: " + uri);
+  }
+
+  private String extractSegmentKeyFromFileUrl(String path) {
+    // not needed, only for integration test
+    String segmentName = path.substring(path.lastIndexOf("/") + 1, 
path.indexOf(".tar.gz"));
+    String tableName = segmentName.substring(0, segmentName.indexOf("_"));
+    return tableName + "/" + segmentName;
+  }
+
+  private boolean isPathTerminatedByDelimiter(URI uri) {
+    return uri.getPath().endsWith(DELIMITER);
+  }
+
+  private URI normalizeToDirectoryUri(URI uri)
+      throws IOException {
+    if (isPathTerminatedByDelimiter(uri)) {
+      return uri;
+    }
+    try {
+      return new URI(uri.getScheme(), uri.getHost(), 
sanitizePath(uri.getPath() + DELIMITER), null);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private boolean copyFile(URI srcUri, URI dstUri)
+      throws IOException {
+    try {
+      String srcKey = extractSegmentKey(srcUri);
+      String dstKey = extractSegmentKey(dstUri);
+      CopyObjectResponse copyObjectResponse = _s3Client.copyObject(
+          builder -> 
builder.sourceBucket(BUCKET_NAME).sourceKey(srcKey).destinationBucket(BUCKET_NAME)
+              .destinationKey(dstKey));
+      return copyObjectResponse.sdkHttpResponse().isSuccessful();
+//      String encodedUrl = URLEncoder.encode(srcUri.getHost() + 
srcUri.getPath(), StandardCharsets.UTF_8);
+//
+//      String dstPath = sanitizePath(dstUri.getPath());
+//      CopyObjectRequest copyReq = generateCopyObjectRequest(encodedUrl, 
dstUri, dstPath, null);
+//      CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq);
+//      return copyObjectResponse.sdkHttpResponse().isSuccessful();
+    } catch (S3Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String sanitizePath(String path) {
+    path = path.replaceAll(DELIMITER + "+", DELIMITER);
+    if (path.startsWith(DELIMITER) && !path.equals(DELIMITER)) {
+      path = path.substring(1);
+    }
+    return path;
+  }
+
+  private CopyObjectRequest generateCopyObjectRequest(String copySource, URI 
dest, String path,
+      Map<String, String> metadata) {
+    CopyObjectRequest.Builder copyReqBuilder =
+        
CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path);
+//    if (_storageClass != null) {
+//      copyReqBuilder.storageClass(_storageClass);
+//    }
+    if (metadata != null) {
+      
copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE);
+    }
+//    if (!_disableAcl) {
+      copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+//    }
+//    if (_serverSideEncryption != null) {
+//      
copyReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId);
+//      if (_ssekmsEncryptionContext != null) {
+//        copyReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
+//      }
+//    }
+    return copyReqBuilder.build();
+  }
+
+  private String normalizeToDirectoryPrefix(URI uri)
+      throws IOException {
+    Preconditions.checkNotNull(uri, "uri is null");
+    URI strippedUri = getBase(uri).relativize(uri);
+    if (isPathTerminatedByDelimiter(strippedUri)) {
+      return sanitizePath(strippedUri.getPath());
+    }
+    return sanitizePath(strippedUri.getPath() + DELIMITER);
+  }
+
+  private URI getBase(URI uri)
+      throws IOException {
+    try {
+      return new URI(uri.getScheme(), uri.getHost(), null, null);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5f7c950178..d49b4b475a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -583,8 +583,10 @@ public class PinotLLCRealtimeSegmentManager {
       return;
     }
 
-    URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName);
-    PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+    URI tableDirURI = URIUtils.getUri("http://localhost:20000/segments";, 
rawTableName);
+    PinotFS pinotFS = PinotFSFactory.create("http");
+//    URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName);
+//    PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
     String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, 
segmentLocation, pinotFS);
 
     if (!isTmpSegmentAsyncDeletionEnabled()) {
@@ -2415,7 +2417,8 @@ public class PinotLLCRealtimeSegmentManager {
 
   @VisibleForTesting
   URI createSegmentPath(String rawTableName, String segmentName) {
-    return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
+    return URIUtils.getUri("http://localhost:20000/segments";, rawTableName, 
URIUtils.encode(segmentName));
+    //return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e3225dea09..0ad49cf1d1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -280,6 +280,18 @@ public class ControllerTest {
     properties.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
     properties.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
     properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, true);
+
+//    properties.put("pinot.controller.storage.factory.class.s3", 
"org.apache.pinot.plugin.filesystem.S3PinotFS");
+//    properties.put("controller.data.dir", 
"s3://127.0.0.1:9000/pinot-bucket");
+//    properties.put("pinot.controller.storage.factory.s3.region", 
"us-east-1");
+//    properties.put("pinot.controller.segment.fetcher.protocols", 
"file,http,s3");
+//    properties.put("pinot.controller.segment.fetcher.s3.class", 
"org.apache.pinot.common.utils.fetcher.LiSegmentFetcher");
+//    properties.put("pinot.controller.storage.factory.s3.accessKey", 
"minioadmin");
+//    properties.put("pinot.controller.storage.factory.s3.secretKey", 
"minioadmin");
+
+    properties.put("pinot.controller.storage.factory.class.http", 
"org.apache.pinot.common.utils.filesystem.LiLocalPinotFS");
+
+
     overrideControllerConf(properties);
     return properties;
   }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 21f3dc3c6b..ddd8670c16 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -166,7 +166,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected int getNumKafkaPartitions() {
-    return DEFAULT_LLC_NUM_KAFKA_PARTITIONS;
+    return DEFAULT_LLC_NUM_KAFKA_PARTITIONS; //1; 
//DEFAULT_LLC_NUM_KAFKA_PARTITIONS;
   }
 
   protected String getKafkaTopic() {
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index acf0b78d50..1cd735b2e3 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -327,7 +327,7 @@ public class ClusterIntegrationTestUtils {
       org.apache.pinot.spi.data.Schema schema, int segmentIndex, File 
segmentDir, File tarDir)
       throws Exception {
     // Test segment with space and special character in the file name
-    buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + " %", 
segmentDir, tarDir);
+    buildSegmentFromAvro(avroFile, tableConfig, schema, segmentIndex + "", 
segmentDir, tarDir);
   }
 
   /**
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 4e0366af87..de09cfcf6a 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -79,6 +79,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.NetUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.intellij.lang.annotations.Language;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -242,6 +243,17 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected PinotConfiguration getServerConf(int serverId) {
     PinotConfiguration serverConf = new PinotConfiguration();
+    serverConf.setProperty("pinot.server.segment.fetcher.protocols", 
List.of("file", "http"));
+    serverConf.setProperty("pinot.server.segment.fetcher.file.class",
+        "org.apache.pinot.common.utils.fetcher.LiSegmentFetcher");
+    serverConf.setProperty("pinot.server.segment.fetcher.http.class",
+        "org.apache.pinot.common.utils.fetcher.LiSegmentFetcher");
+    serverConf.setProperty("pinot.server.storage.factory.class.http", 
"org.apache.pinot.common.utils.filesystem.LiLocalPinotFS");
+
+    // needed for realtime uploads
+    
serverConf.setProperty("pinot.server.instance.segment.upload.to.deep.store", 
true);
+    serverConf.setProperty("pinot.server.instance.segment.store.uri", 
"http://localhost:20000/segments";);
+
     serverConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
     serverConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, 
getHelixClusterName());
     serverConf.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, LOCAL_HOST);
@@ -458,15 +470,9 @@ public abstract class ClusterTest extends ControllerTest {
             .availableProcessors()));
         List<Future<Integer>> futures = new ArrayList<>(numSegments);
         for (File segmentTarFile : segmentTarFiles) {
-          futures.add(executorService.submit(() -> {
-            if (System.currentTimeMillis() % 2 == 0) {
-              return 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
-                  segmentTarFile, getSegmentUploadAuthHeaders(), tableName, 
tableType).getStatusCode();
-            } else {
-              return uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
-                  segmentTarFile);
-            }
-          }));
+          futures.add(executorService.submit(
+              () -> uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+                  segmentTarFile)));
         }
         executorService.shutdown();
         for (Future<Integer> future : futures) {
@@ -479,11 +485,19 @@ public abstract class ClusterTest extends ControllerTest {
   private int uploadSegmentWithOnlyMetadata(String tableName, TableType 
tableType, URI uploadSegmentHttpURI,
       FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
       throws IOException, HttpErrorStatusException {
+    String controllerSegmentsEndpoint = "http://localhost:20000/segments";;
+    String fileName = URIUtils.encode(segmentTarFile.getName());
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String segmentName = fileName.substring(0, fileName.indexOf(".tar.gz"));
     List<Header> headers = new ArrayList<>(List.of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
-        "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/"
-          + URIUtils.encode(segmentTarFile.getName())),
-      new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
-        FileUploadDownloadClient.FileUploadType.METADATA.toString())));
+            controllerSegmentsEndpoint + "/" + rawTableName + "/" + 
segmentName),
+        new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+            FileUploadDownloadClient.FileUploadType.METADATA.toString())));
+//    List<Header> headers = new ArrayList<>(List.of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+//            String.format("file://%s/%s", 
segmentTarFile.getParentFile().getAbsolutePath(),
+//                URIUtils.encode(segmentTarFile.getName()))),
+//        new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+//            FileUploadDownloadClient.FileUploadType.METADATA.toString())));
     headers.addAll(getSegmentUploadAuthHeaders());
     // Add table name and table type as request parameters
     NameValuePair tableNameValuePair =
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 36af424482..8664c19263 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -257,7 +257,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     List<File> tarDirs = new ArrayList<>();
     tarDirs.add(_tarDir);
-    tarDirs.add(tarDir2);
+//    tarDirs.add(tarDir2);
     try {
       uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
     } catch (Exception e) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index eafba8a256..ee24387462 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -42,6 +42,7 @@ public class PinotFSFactory {
   private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, 
PinotFS>() {
     {
       put(LOCAL_PINOT_FS_SCHEME, new NoClosePinotFS(new LocalPinotFS()));
+//      put("http", new NoClosePinotFS(new LiLocalPinotFS()));
     }
   };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to