This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9046af27a1 Return interfaces in Segment processor framework classes
instead of Implementations (#14252)
9046af27a1 is described below
commit 9046af27a1917f75ce3c0dfa4a140a959112fb8e
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Oct 18 16:55:55 2024 +0530
Return interfaces in Segment processor framework classes instead of
Implementations (#14252)
* change abstraction for writer.
* fix segment fetcher for localfs.
---------
Co-authored-by: Aishik <[email protected]>
---
.../apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java | 6 +++++-
.../apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java | 3 +++
.../core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java | 4 ++--
.../apache/pinot/core/segment/processing/mapper/SegmentMapper.java | 4 ++--
4 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
index b0bb4706a3..c11a0239c1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java
@@ -28,6 +28,10 @@ public class PinotFSSegmentFetcher extends
BaseSegmentFetcher {
@Override
protected void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
throws Exception {
- PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
+ if (uri.getScheme() == null) {
+
PinotFSFactory.create(PinotFSFactory.LOCAL_PINOT_FS_SCHEME).copyToLocalFile(uri,
dest);
+ } else {
+ PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
+ }
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 543db8c403..235c63bf0c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -106,6 +106,9 @@ public class SegmentFetcherFactory {
return segmentFetcher;
} else {
LOGGER.info("Segment fetcher is not configured for protocol: {}, using
default", protocol);
+ if (protocol == null) {
+ return PINOT_FS_SEGMENT_FETCHER;
+ }
switch (protocol) {
case CommonConstants.HTTP_PROTOCOL:
case CommonConstants.HTTPS_PROTOCOL:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
index 541bd14e26..d7db6509f3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.pinot.spi.data.readers.GenericRow;
-public class AdaptiveSizeBasedWriter implements
AdaptiveConstraintsWriter<GenericRowFileWriter, GenericRow> {
+public class AdaptiveSizeBasedWriter implements
AdaptiveConstraintsWriter<FileWriter<GenericRow>, GenericRow> {
private final long _bytesLimit;
private long _numBytesWritten;
@@ -45,7 +45,7 @@ public class AdaptiveSizeBasedWriter implements
AdaptiveConstraintsWriter<Generi
}
@Override
- public void write(GenericRowFileWriter writer, GenericRow row) throws
IOException {
+ public void write(FileWriter<GenericRow> writer, GenericRow row) throws
IOException {
_numBytesWritten += writer.writeData(row);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 49ae88b19f..cca0a0ef58 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -31,8 +31,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import
org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter;
+import org.apache.pinot.core.segment.processing.genericrow.FileWriter;
import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
-import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
@@ -245,7 +245,7 @@ public class SegmentMapper {
}
// Get the file writer.
- GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ FileWriter<GenericRow> fileWriter = fileManager.getFileWriter();
// Write the row.
_adaptiveSizeBasedWriter.write(fileWriter, row);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]