http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 9d0c933..b76722b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -279,7 +279,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { this.carbonDataFileName = CarbonTablePath .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(), model.getBucketId(), model.getTaskExtension(), - "" + model.getCarbonDataFileAttributes().getFactTimeStamp()); + "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId()); this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator + carbonDataFileName; try { @@ -368,7 +368,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(), model.getBucketId(), model.getTaskExtension(), - "" + model.getCarbonDataFileAttributes().getFactTimeStamp()); + "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId()); indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS); } else { // randomly choose a temp location for index file @@ -378,7 +378,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { indexFileName = chosenTempLocation + File.separator + CarbonTablePath .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(), model.getBucketId(), model.getTaskExtension(), - "" + model.getCarbonDataFileAttributes().getFactTimeStamp()); + "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId()); } CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 0ea7223..da77cf6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -601,7 +601,9 @@ public final class CarbonLoaderUtil { long sizePerNode = 0; long totalFileSize = 0; if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) { - sizePerNode = blockInfos.size() / noofNodes; + if (blockInfos.size() > 0) { + sizePerNode = blockInfos.size() / noofNodes; + } sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode; } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy || BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index f6406c7..4bfadce 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -37,15 +37,18 @@ import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonTaskInfo; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; import org.apache.carbondata.hadoop.CarbonRecordReader; @@ -164,12 +167,15 @@ public class SearchRequestHandler { Objects.requireNonNull(datamap); List<Segment> segments = new LinkedList<>(); HashMap<String, Integer> uniqueSegments = new HashMap<>(); + LoadMetadataDetails[] loadMetadataDetails = + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); for (CarbonInputSplit split : mbSplit.getAllSplits()) { - String segmentId = split.getSegmentId(); + String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString(); if (uniqueSegments.get(segmentId) == null) { - segments.add(Segment.toSegment( - segmentId, - new LatestFilesReadCommittedScope(table.getTablePath(), segmentId))); + segments.add(Segment.toSegment(segmentId, + new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), + loadMetadataDetails))); uniqueSegments.put(segmentId, 1); } else { uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index 4653445..bd622f0 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -128,7 +128,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { segmentDir = CarbonTablePath.getSegmentPath( carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId); - fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); + fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId); } private void initializeAtFirstRow() throws IOException, InterruptedException {