http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java new file mode 100644 index 0000000..fd9c0ef --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.stats; + +/** + * interface will be used to record and log the query statistics + */ +public interface QueryStatisticsRecorder { + + void recordStatistics(QueryStatistic statistic); + + void logStatistics(); + + void logStatisticsAsTableExecutor(); + + void recordStatisticsForDriver(QueryStatistic statistic, String queryId); + + void logStatisticsAsTableDriver(); + +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java new file mode 100644 index 0000000..a751936 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.stats; + +import java.io.Serializable; + +/** + * Class will be used to record and log the query statistics + */ +public class QueryStatisticsRecorderDummy implements QueryStatisticsRecorder,Serializable { + + /** + * serialization version + */ + private static final long serialVersionUID = -5719752001674467864L; + + public QueryStatisticsRecorderDummy() { + + } + + /** + * Below method will be used to add the statistics + * + * @param statistic + */ + public synchronized void recordStatistics(QueryStatistic statistic) { + + } + + /** + * Below method will be used to log the statistic + */ + public void logStatistics() { + + } + + /** + * Below method will be used to show statistic log as table + */ + public void logStatisticsAsTableExecutor() { + + } + + public void recordStatisticsForDriver(QueryStatistic statistic, String queryId) { + + } + + public void logStatisticsAsTableDriver() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java new file mode 100644 index 0000000..9cfeab4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.stats; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; + +import static org.apache.carbondata.core.util.CarbonUtil.printLine; + +/** + * Class will be used to record and log the query statistics + */ +public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder,Serializable { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(QueryStatisticsRecorderImpl.class.getName()); + + /** + * serialization version + */ + private static final long serialVersionUID = -5719752001674467864L; + + /** + * list for statistics to record time taken + * by each phase of the query for example aggregation + * scanning,block loading time etc. + */ + private List<QueryStatistic> queryStatistics; + + /** + * query with taskd + */ + private String queryIWthTask; + + public QueryStatisticsRecorderImpl(String queryId) { + queryStatistics = new ArrayList<QueryStatistic>(); + this.queryIWthTask = queryId; + } + + /** + * Below method will be used to add the statistics + * + * @param statistic + */ + public synchronized void recordStatistics(QueryStatistic statistic) { + queryStatistics.add(statistic); + } + + /** + * Below method will be used to log the statistic + */ + public void logStatistics() { + for (QueryStatistic statistic : queryStatistics) { + LOGGER.statistic(statistic.getStatistics(queryIWthTask)); + } + } + + /** + * Below method will be used to show statistic log as table + */ + public void logStatisticsAsTableExecutor() { + String tableInfo = collectExecutorStatistics(); + if (null != tableInfo) { + LOGGER.statistic(tableInfo); + } + } + + /** + * Below method will parse queryStatisticsMap and put time into table + */ + public String collectExecutorStatistics() { + String load_blocks_time = ""; + String scan_blocks_time = ""; + String scan_blocks_num = ""; + String load_dictionary_time = ""; + String result_size = ""; + String total_executor_time = ""; + String splitChar = " "; + String total_blocklet = ""; + String valid_scan_blocklet = ""; + try { + for (QueryStatistic statistic : queryStatistics) { + switch (statistic.getMessage()) { + case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR: + load_blocks_time += statistic.getTimeTaken() + splitChar; + break; + case QueryStatisticsConstants.SCAN_BLOCKS_TIME: + scan_blocks_time += statistic.getTimeTaken() + splitChar; + break; + case QueryStatisticsConstants.SCAN_BLOCKS_NUM: + scan_blocks_num += statistic.getCount() + splitChar; + break; + case QueryStatisticsConstants.LOAD_DICTIONARY: + load_dictionary_time += statistic.getTimeTaken() + splitChar; + break; + case QueryStatisticsConstants.RESULT_SIZE: + result_size += statistic.getCount() + splitChar; + break; + case QueryStatisticsConstants.EXECUTOR_PART: + total_executor_time += statistic.getTimeTaken() + splitChar; + break; + case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM: + total_blocklet = statistic.getCount() + splitChar; + break; + case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM: + valid_scan_blocklet = statistic.getCount() + splitChar; + break; + default: + break; + } + } + String headers = "task_id,load_blocks_time,load_dictionary_time,scan_blocks_time," + + "total_executor_time,scan_blocks_num,total_blocklet," + + "valid_scan_blocklet,result_size"; + List<String> values = new ArrayList<String>(); + values.add(queryIWthTask); + values.add(load_blocks_time); + values.add(load_dictionary_time); + values.add(scan_blocks_time); + values.add(total_executor_time); + values.add(scan_blocks_num); + values.add(total_blocklet); + values.add(valid_scan_blocklet); + values.add(result_size); + StringBuilder tableInfo = new StringBuilder(); + String[] columns = headers.split(","); + String line = ""; + String hearLine = ""; + String valueLine = ""; + for (int i = 0; i < columns.length; i++) { + int len = Math.max(columns[i].length(), values.get(i).length()); + line += "+" + printLine("-", len); + hearLine += "|" + printLine(" ", len - columns[i].length()) + columns[i]; + valueLine += "|" + printLine(" ", len - values.get(i).length()) + values.get(i); + } + // struct table info + tableInfo.append(line + "+").append("\n"); + tableInfo.append(hearLine + "|").append("\n"); + tableInfo.append(line + "+").append("\n"); + tableInfo.append(valueLine + "|").append("\n"); + tableInfo.append(line + "+").append("\n"); + return "Print query statistic for each task id:" + "\n" + tableInfo.toString(); + } catch (Exception ex) { + return "Put statistics into table failed, catch exception: " + ex.getMessage(); + } + } + + public void recordStatisticsForDriver(QueryStatistic statistic, String queryId) { + + } + + public void logStatisticsAsTableDriver() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java new file mode 100644 index 0000000..70043d4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.statusmanager; + +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; + +public class LoadMetadataDetails implements Serializable { + + private static final long serialVersionUID = 1106104914918491724L; + private String timestamp; + private String loadStatus; + private String loadName; + private String partitionCount; + private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE; + + // update delta end timestamp + private String updateDeltaEndTimestamp = ""; + + // update delta start timestamp + private String updateDeltaStartTimestamp = ""; + + // this will represent the update status file name at that point of time. + private String updateStatusFileName = ""; + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LoadMetadataDetails.class.getName()); + + // dont remove static as the write will fail. + private static final SimpleDateFormat parser = + new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); + /** + * Segment modification or deletion time stamp + */ + private String modificationOrdeletionTimesStamp; + private String loadStartTime; + + private String mergedLoadName; + /** + * visibility is used to determine whether to the load is visible or not. + */ + private String visibility = "true"; + + /** + * To know if the segment is a major compacted segment or not. + */ + private String majorCompacted; + + public String getPartitionCount() { + return partitionCount; + } + + public void setPartitionCount(String partitionCount) { + this.partitionCount = partitionCount; + } + + public long getLoadEndTime() { + return convertTimeStampToLong(timestamp); + } + + public void setLoadEndTime(long timestamp) { + this.timestamp = getTimeStampConvertion(timestamp);; + } + + public String getLoadStatus() { + return loadStatus; + } + + public void setLoadStatus(String loadStatus) { + this.loadStatus = loadStatus; + } + + public String getLoadName() { + return loadName; + } + + public void setLoadName(String loadName) { + this.loadName = loadName; + } + + /** + * @return the modificationOrdeletionTimesStamp + */ + public long getModificationOrdeletionTimesStamp() { + if(null == modificationOrdeletionTimesStamp) { + return 0; + } + return convertTimeStampToLong(modificationOrdeletionTimesStamp); + } + + /** + * @param modificationOrdeletionTimesStamp the modificationOrdeletionTimesStamp to set + */ + public void setModificationOrdeletionTimesStamp(long modificationOrdeletionTimesStamp) { + this.modificationOrdeletionTimesStamp = + getTimeStampConvertion(modificationOrdeletionTimesStamp); + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((loadName == null) ? 0 : loadName.hashCode()); + return result; + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override public boolean equals(Object obj) { + if (obj == null) { + return false; + + } + if (!(obj instanceof LoadMetadataDetails)) { + return false; + } + LoadMetadataDetails other = (LoadMetadataDetails) obj; + if (loadName == null) { + if (other.loadName != null) { + return false; + } + } else if (!loadName.equals(other.loadName)) { + return false; + } + return true; + } + + /** + * @return the startLoadTime + */ + public long getLoadStartTime() { + return convertTimeStampToLong(loadStartTime); + } + + /** + * return loadStartTime + * + * @return + */ + public long getLoadStartTimeAsLong() { + return (!loadStartTime.isEmpty()) ? getTimeStamp(loadStartTime) : 0; + } + + /** + * This method will convert a given timestamp to long value and then to string back + * + * @param factTimeStamp + * @return + */ + private long convertTimeStampToLong(String factTimeStamp) { + SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS); + Date dateToStr = null; + try { + dateToStr = parser.parse(factTimeStamp); + return dateToStr.getTime(); + } catch (ParseException e) { + LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage()); + parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); + try { + dateToStr = parser.parse(factTimeStamp); + return dateToStr.getTime(); + } catch (ParseException e1) { + LOGGER + .error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e1.getMessage()); + return 0; + } + } + } + + /** + * returns load start time as long value + * + * @param loadStartTime + * @return + */ + public Long getTimeStamp(String loadStartTime) { + Date dateToStr = null; + try { + dateToStr = parser.parse(loadStartTime); + return dateToStr.getTime() * 1000; + } catch (ParseException e) { + LOGGER.error("Cannot convert" + loadStartTime + " to Time/Long type value" + e.getMessage()); + return null; + } + } + + private String getTimeStampConvertion(long time) { + SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS); + return sdf.format(time); + } + + /** + * @param loadStartTime + */ + public void setLoadStartTime(long loadStartTime) { + this.loadStartTime = getTimeStampConvertion(loadStartTime); + } + + /** + * @return the mergedLoadName + */ + public String getMergedLoadName() { + return mergedLoadName; + } + + /** + * @param mergedLoadName the mergedLoadName to set + */ + public void setMergedLoadName(String mergedLoadName) { + this.mergedLoadName = mergedLoadName; + } + + /** + * @return the visibility + */ + public String getVisibility() { + return visibility; + } + + /** + * @param visibility the visibility to set + */ + public void setVisibility(String visibility) { + this.visibility = visibility; + } + + /** + * Return true if it is a major compacted segment. + * @return majorCompacted + */ + public String isMajorCompacted() { + return majorCompacted; + } + + /** + * Set true if it is a major compacted segment. + * + * @param majorCompacted + */ + public void setMajorCompacted(String majorCompacted) { + this.majorCompacted = majorCompacted; + } + + /** + * To get isDeleted property. + * + * @return isDeleted + */ + public String getIsDeleted() { + return isDeleted; + } + + /** + * To set isDeleted property. + * + * @param isDeleted + */ + public void setIsDeleted(String isDeleted) { + this.isDeleted = isDeleted; + } + + /** + * To get the update delta end timestamp + * + * @return updateDeltaEndTimestamp + */ + public String getUpdateDeltaEndTimestamp() { + return updateDeltaEndTimestamp; + } + + /** + * To set the update delta end timestamp + * + * @param updateDeltaEndTimestamp + */ + public void setUpdateDeltaEndTimestamp(String updateDeltaEndTimestamp) { + this.updateDeltaEndTimestamp = updateDeltaEndTimestamp; + } + + /** + * To get the update delta start timestamp + * + * @return updateDeltaStartTimestamp + */ + public String getUpdateDeltaStartTimestamp() { + return updateDeltaStartTimestamp; + } + + /** + * To set the update delta start timestamp + * + * @param updateDeltaStartTimestamp + */ + public void setUpdateDeltaStartTimestamp(String updateDeltaStartTimestamp) { + this.updateDeltaStartTimestamp = updateDeltaStartTimestamp; + } + + /** + * To get the updateStatusFileName + * + * @return updateStatusFileName + */ + public String getUpdateStatusFileName() { + return updateStatusFileName; + } + + /** + * To set the updateStatusFileName + * + * @param updateStatusFileName + */ + public void setUpdateStatusFileName(String updateStatusFileName) { + this.updateStatusFileName = updateStatusFileName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java new file mode 100644 index 0000000..1458d48 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.statusmanager; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.locks.CarbonLockFactory; +import org.apache.carbondata.core.locks.CarbonLockUtil; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.locks.LockUsage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; + +/** + * Manages Load/Segment status + */ +public class SegmentStatusManager { + + private static final LogService LOG = + LogServiceFactory.getLogService(SegmentStatusManager.class.getName()); + + private AbsoluteTableIdentifier absoluteTableIdentifier; + + public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + } + + /** + * This will return the lock object used to lock the table status file before updation. + * + * @return + */ + public ICarbonLock getTableStatusLock() { + return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(), + LockUsage.TABLE_STATUS_LOCK); + } + + /** + * This method will return last modified time of tablestatus file + */ + public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier) + throws IOException { + String tableStatusPath = CarbonStorePath + .getCarbonTablePath(identifier.getStorePath(), identifier.getCarbonTableIdentifier()) + .getTableStatusFilePath(); + if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { + return 0L; + } else { + return FileFactory.getCarbonFile(tableStatusPath, FileFactory.getFileType(tableStatusPath)) + .getLastModifiedTime(); + } + } + + public List<Long> getUpdateDeltaStartEndTimeStamp(final String loadIds, + LoadMetadataDetails[] listOfLoadFolderDetailsArray) { + List<Long> updateDeltaStartEndTimestamp = new ArrayList<>(); + for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { + if (loadIds.equalsIgnoreCase(loadMetadata.getLoadName())) { + // Make sure the Load is not compacted and not marked for delete. + if (CarbonCommonConstants.COMPACTED + .equalsIgnoreCase(loadMetadata.getLoadStatus())) { + return null; + } else if (CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) { + return null; + } + else { + return updateDeltaStartEndTimestamp; + } + } + } + return null; + } + + + /** + * get valid segment for given table + * + * @return + * @throws IOException + */ + public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { + + // @TODO: move reading LoadStatus file to separate class + List<String> listOfValidSegments = new ArrayList<String>(10); + List<String> listOfValidUpdatedSegments = new ArrayList<String>(10); + List<String> listOfInvalidSegments = new ArrayList<String>(10); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + String dataPath = carbonTablePath.getTableStatusFilePath(); + DataInputStream dataInputStream = null; + Gson gsonObjectToRead = new Gson(); + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath)); + LoadMetadataDetails[] loadFolderDetailsArray; + try { + if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) { + dataInputStream = fileOperation.openForRead(); + BufferedReader buffReader = + new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8")); + loadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class); + //just directly iterate Array + List<LoadMetadataDetails> loadFolderDetails = Arrays.asList(loadFolderDetailsArray); + for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) { + if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) + || CarbonCommonConstants.MARKED_FOR_UPDATE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) + || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) { + // check for merged loads. + if (null != loadMetadataDetails.getMergedLoadName()) { + if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) { + listOfValidSegments.add(loadMetadataDetails.getMergedLoadName()); + } + // if merged load is updated then put it in updated list + if (CarbonCommonConstants.MARKED_FOR_UPDATE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) { + listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName()); + } + continue; + } + + if (CarbonCommonConstants.MARKED_FOR_UPDATE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) { + + listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName()); + } + listOfValidSegments.add(loadMetadataDetails.getLoadName()); + } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) + || CarbonCommonConstants.COMPACTED + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) + || CarbonCommonConstants.MARKED_FOR_DELETE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) { + listOfInvalidSegments.add(loadMetadataDetails.getLoadName()); + } + } + } + } catch (IOException e) { + LOG.error(e); + throw e; + } finally { + try { + if (null != dataInputStream) { + dataInputStream.close(); + } + } catch (Exception e) { + LOG.error(e); + throw e; + } + } + return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments, + listOfInvalidSegments); + } + + /** + * This method reads the load metadata file + * + * @param tableFolderPath + * @return + */ + public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + String metadataFileName = tableFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.LOADMETADATA_FILENAME; + LoadMetadataDetails[] listOfLoadFolderDetailsArray; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(metadataFileName, FileFactory.getFileType(metadataFileName)); + + try { + if (!FileFactory.isFileExist(metadataFileName, FileFactory.getFileType(metadataFileName))) { + return new LoadMetadataDetails[0]; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + listOfLoadFolderDetailsArray = + gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class); + } catch (IOException e) { + return new LoadMetadataDetails[0]; + } finally { + closeStreams(buffReader, inStream, dataInputStream); + } + + return listOfLoadFolderDetailsArray; + } + + /** + * compares two given date strings + * + * @param loadValue + * @param userValue + * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg, + * 0 otherwise + */ + private static Integer compareDateValues(Long loadValue, Long userValue) { + return loadValue.compareTo(userValue); + } + + /** + * updates deletion status + * + * @param loadIds + * @param tableFolderPath + * @return + */ + public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier, + List<String> loadIds, String tableFolderPath) throws Exception { + CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier(); + ICarbonLock carbonDeleteSegmentLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK); + ICarbonLock carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK); + String tableDetails = + carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName(); + List<String> invalidLoadIds = new ArrayList<String>(0); + try { + if (carbonDeleteSegmentLock.lockWithRetries()) { + LOG.info("Delete segment lock has been successfully acquired"); + + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( + identifier.getStorePath(), identifier.getCarbonTableIdentifier()); + String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = null; + if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) { + // log error. + LOG.error("Load metadata file is not present."); + return loadIds; + } + // read existing metadata details in load metadata. + listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); + if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) { + updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds); + if (invalidLoadIds.isEmpty()) { + // All or None , if anything fails then dont write + if(carbonTableStatusLock.lockWithRetries()) { + LOG.info("Table status lock has been successfully acquired"); + // To handle concurrency scenarios, always take latest metadata before writing + // into status file. + LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath); + updateLatestTableStatusDetails(listOfLoadFolderDetailsArray, + latestLoadMetadataDetails); + writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); + } + else { + String errorMsg = "Delete segment by id is failed for " + tableDetails + + ". Not able to acquire the table status lock due to other operation running " + + "in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); + } + + } else { + return invalidLoadIds; + } + + } else { + LOG.audit("Delete segment by Id is failed. No matching segment id found."); + return loadIds; + } + + } else { + String errorMsg = "Delete segment by id is failed for " + tableDetails + + ". Not able to acquire the delete segment lock due to another delete " + + "operation is running in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); + } + } catch (IOException e) { + LOG.error("IOException" + e.getMessage()); + } finally { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK); + } + + return invalidLoadIds; + } + + /** + * updates deletion status + * + * @param loadDate + * @param tableFolderPath + * @return + */ + public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier, + String loadDate, String tableFolderPath, Long loadStartTime) throws Exception { + CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier(); + ICarbonLock carbonDeleteSegmentLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK); + ICarbonLock carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK); + String tableDetails = + carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName(); + List<String> invalidLoadTimestamps = new ArrayList<String>(0); + try { + if (carbonDeleteSegmentLock.lockWithRetries()) { + LOG.info("Delete segment lock has been successfully acquired"); + + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( + identifier.getStorePath(), identifier.getCarbonTableIdentifier()); + String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = null; + + if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) { + // log error. + LOG.error("Error message: " + "Load metadata file is not present."); + invalidLoadTimestamps.add(loadDate); + return invalidLoadTimestamps; + } + // read existing metadata details in load metadata. + listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); + if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) { + updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps, + loadStartTime); + if (invalidLoadTimestamps.isEmpty()) { + if(carbonTableStatusLock.lockWithRetries()) { + LOG.info("Table status lock has been successfully acquired."); + // To handle concurrency scenarios, always take latest metadata before writing + // into status file. + LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath); + updateLatestTableStatusDetails(listOfLoadFolderDetailsArray, + latestLoadMetadataDetails); + writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); + } + else { + + String errorMsg = "Delete segment by date is failed for " + tableDetails + + ". Not able to acquire the table status lock due to other operation running " + + "in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); + + } + } else { + return invalidLoadTimestamps; + } + + } else { + LOG.audit("Delete segment by date is failed. No matching segment found."); + invalidLoadTimestamps.add(loadDate); + return invalidLoadTimestamps; + } + + } else { + String errorMsg = "Delete segment by date is failed for " + tableDetails + + ". Not able to acquire the delete segment lock due to another delete " + + "operation is running in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); + } + } catch (IOException e) { + LOG.error("Error message: " + "IOException" + e.getMessage()); + } finally { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK); + } + + return invalidLoadTimestamps; + } + + /** + * writes load details into a given file at @param dataLoadLocation + * + * @param dataLoadLocation + * @param listOfLoadFolderDetailsArray + * @throws IOException + */ + public static void writeLoadDetailsIntoFile(String dataLoadLocation, + LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + // write the updated data into the metadata file. + + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray); + brWriter.write(metadataInstance); + } catch (IOException ioe) { + LOG.error("Error message: " + ioe.getLocalizedMessage()); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + + } + + /** + * updates deletion status details for each load and returns invalidLoadIds + * + * @param loadIds + * @param listOfLoadFolderDetailsArray + * @param invalidLoadIds + * @return invalidLoadIds + */ + private static List<String> updateDeletionStatus(List<String> loadIds, + LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) { + for (String loadId : loadIds) { + boolean loadFound = false; + // For each load id loop through data and if the + // load id is found then mark + // the metadata as deleted. + for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { + + if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) { + // if the segment is compacted then no need to delete that. + if (CarbonCommonConstants.COMPACTED + .equalsIgnoreCase(loadMetadata.getLoadStatus())) { + LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId); + invalidLoadIds.add(loadId); + return invalidLoadIds; + } + if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) { + loadFound = true; + loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE); + loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime()); + LOG.info("Segment ID " + loadId + " Marked for Delete"); + } + break; + } + } + + if (!loadFound) { + LOG.audit("Delete segment by ID is failed. No matching segment id found :" + loadId); + invalidLoadIds.add(loadId); + return invalidLoadIds; + } + } + return invalidLoadIds; + } + + /** + * updates deletion status details for load and returns invalidLoadTimestamps + * + * @param loadDate + * @param listOfLoadFolderDetailsArray + * @param invalidLoadTimestamps + * @return invalidLoadTimestamps + */ + public static List<String> updateDeletionStatus(String loadDate, + LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps, + Long loadStartTime) { + // For each load timestamp loop through data and if the + // required load timestamp is found then mark + // the metadata as deleted. + boolean loadFound = false; + String loadStartTimeString = "Load Start Time: "; + for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { + Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime); + if (result < 0) { + if (CarbonCommonConstants.COMPACTED + .equalsIgnoreCase(loadMetadata.getLoadStatus())) { + LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() + + "as the segment has been compacted."); + continue; + } + if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) { + loadFound = true; + updateSegmentMetadataDetails(loadMetadata); + LOG.info("Info: " + + loadStartTimeString + loadMetadata.getLoadStartTime() + + " Marked for Delete"); + } + } + + } + + if (!loadFound) { + invalidLoadTimestamps.add(loadDate); + LOG.audit("Delete segment by date is failed. No matching segment found."); + return invalidLoadTimestamps; + } + return invalidLoadTimestamps; + } + + /** + * This method closes the streams + * + * @param streams - streams to close. + */ + private static void closeStreams(Closeable... streams) { + // Added if to avoid NullPointerException in case one stream is being passed as null + if (null != streams) { + for (Closeable stream : streams) { + if (null != stream) { + try { + stream.close(); + } catch (IOException e) { + LOG.error("Error while closing stream" + stream); + } + } + } + } + } + + /** + * updates table status details using latest metadata + * + * @param oldMetadata + * @param newMetadata + * @return + */ + + public static List<LoadMetadataDetails> updateLatestTableStatusDetails( + LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) { + + List<LoadMetadataDetails> newListMetadata = + new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata)); + for (LoadMetadataDetails oldSegment : oldMetadata) { + if (CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oldSegment.getLoadStatus())) { + updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment))); + } + } + return newListMetadata; + } + + /** + * updates segment status and modificaton time details + * + * @param loadMetadata + */ + public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) { + // update status only if the segment is not marked for delete + if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) { + loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE); + loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime()); + } + } + + /** + * This API will return the update status file name. + * @param segmentList + * @return + */ + public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) { + if(segmentList.length == 0) { + return ""; + } + else { + for(LoadMetadataDetails eachSeg : segmentList) { + // file name stored in 0th segment. + if (eachSeg.getLoadName().equalsIgnoreCase("0")) { + return eachSeg.getUpdateStatusFileName(); + } + } + } + return ""; + } + + /** + * getting the task numbers present in the segment. + * @param segmentId + * @return + */ + public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager + updateStatusManager) { + List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId); + for (String eachFileName : list) { + taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName)); + } + return taskList; + } + + + public static class ValidAndInvalidSegmentsInfo { + private final List<String> listOfValidSegments; + private final List<String> listOfValidUpdatedSegments; + private final List<String> listOfInvalidSegments; + + private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments, + List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) { + this.listOfValidSegments = listOfValidSegments; + this.listOfValidUpdatedSegments = listOfValidUpdatedSegments; + this.listOfInvalidSegments = listOfInvalidUpdatedSegments; + } + public List<String> getInvalidSegments() { + return listOfInvalidSegments; + } + public List<String> getValidSegments() { + return listOfValidSegments; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java new file mode 100644 index 0000000..f14485b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -0,0 +1,990 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.statusmanager; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.locks.CarbonLockFactory; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.locks.LockUsage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.mutate.TupleIdEnum; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; + +/** + * Manages Segment & block status of carbon table for Delete operation + */ +public class SegmentUpdateStatusManager { + + /** + * logger + */ + private static final LogService LOG = + LogServiceFactory.getLogService(SegmentUpdateStatusManager.class.getName()); + + private AbsoluteTableIdentifier absoluteTableIdentifier; + private LoadMetadataDetails[] segmentDetails; + private SegmentUpdateDetails[] updateDetails; + private CarbonTablePath carbonTablePath; + private Map<String, SegmentUpdateDetails> blockAndDetailsMap; + + /** + * @param absoluteTableIdentifier + */ + public SegmentUpdateStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + // current it is used only for read function scenarios, as file update always requires to work + // on latest file status. + segmentDetails = + segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + updateDetails = readLoadMetadata(); + populateMap(); + } + + /** + * populate the block and its details in a map. + */ + private void populateMap() { + blockAndDetailsMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + for(SegmentUpdateDetails blockDetails : updateDetails) { + + String blockIdentifier = CarbonUpdateUtil + .getSegmentBlockNameKey(blockDetails.getSegmentName(), blockDetails.getActualBlockName()); + + blockAndDetailsMap.put(blockIdentifier, blockDetails); + + } + + } + + /** + * + * @param segID + * @param actualBlockName + * @return null if block is not present in segment update status. + */ + public SegmentUpdateDetails getDetailsForABlock(String segID, String actualBlockName) { + + String blockIdentifier = CarbonUpdateUtil + .getSegmentBlockNameKey(segID, actualBlockName); + + return blockAndDetailsMap.get(blockIdentifier); + + } + + /** + * + * @param key will be like (segid/blockname) 0/0-0-5464654654654 + * @return + */ + public SegmentUpdateDetails getDetailsForABlock(String key) { + + return blockAndDetailsMap.get(key); + + } + + + + /** + * Returns the LoadMetadata Details + * @return + */ + public LoadMetadataDetails[] getLoadMetadataDetails() { + return segmentDetails; + } + + /** + * + * @param loadMetadataDetails + */ + public void setLoadMetadataDetails(LoadMetadataDetails[] loadMetadataDetails) { + this.segmentDetails = loadMetadataDetails; + } + + /** + * Returns the UpdateStatus Details. + * @return + */ + public SegmentUpdateDetails[] getUpdateStatusDetails() { + return updateDetails; + } + + /** + * + * @param segmentUpdateDetails + */ + public void setUpdateStatusDetails(SegmentUpdateDetails[] segmentUpdateDetails) { + this.updateDetails = segmentUpdateDetails; + } + + /** + * This will return the lock object used to lock the table update status file before updation. + * + * @return + */ + public ICarbonLock getTableUpdateStatusLock() { + return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(), + LockUsage.TABLE_UPDATE_STATUS_LOCK); + } + + /** + * Returns all delete delta files of specified block + * + * @param tupleId + * @return + * @throws Exception + */ + public List<String> getDeleteDeltaFiles(String tupleId) throws Exception { + return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT); + } + + + /** + * Returns all update delta files of specified Segment. + * + * @param segmentId + * @return + * @throws Exception + */ + public List<String> getUpdateDeltaFiles(final String segmentId) { + List<String> updatedDeltaFilesList = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + String endTimeStamp = ""; + String startTimeStamp = ""; + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + CarbonFile segDir = + FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); + for (LoadMetadataDetails eachSeg : segmentDetails) { + if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) { + // if the segment is found then take the start and end time stamp. + startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp(); + endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp(); + } + } + // if start timestamp is empty then no update delta is found. so return empty list. + if (startTimeStamp.isEmpty()) { + return updatedDeltaFilesList; + } + final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp); + final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp); + + // else scan the segment for the delta files with the respective timestamp. + CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile pathName) { + String fileName = pathName.getName(); + if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) { + String firstPart = fileName.substring(0, fileName.indexOf('.')); + + long timestamp = Long.parseLong(firstPart + .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + firstPart.length())); + if (Long.compare(timestamp, endTimeStampFinal) <= 0 + && Long.compare(timestamp, startTimeStampFinal) >= 0) { + + // if marked for delete then it is invalid. + if (!isBlockValid(segmentId, fileName)) { + return false; + } + + return true; + } + } + return false; + } + }); + + for (CarbonFile cfile : files) { + updatedDeltaFilesList.add(cfile.getCanonicalPath()); + } + + return updatedDeltaFilesList; + } + + /** + * Returns all deleted records of specified block + * + * @param tupleId + * @return + * @throws Exception + */ + public int[] getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception { + List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT); + CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(); + String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID); + return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId); + } + + + + /** + * Returns all delta file paths of specified block + * + * @param tupleId + * @param extension + * @return + * @throws Exception + */ + public List<String> getDeltaFiles(String tupleId, String extension) throws Exception { + try { + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); + String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment); + String completeBlockName = CarbonTablePath.addDataPartPrefix( + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID) + + CarbonCommonConstants.FACT_FILE_EXT); + String blockPath = + carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; + CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath)); + if (!file.exists()) { + throw new Exception("Invalid tuple id " + tupleId); + } + String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.')); + //blockName without timestamp + final String blockNameFromTuple = + blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-")); + SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray = + readLoadMetadata(); + return getDeltaFiles(file, blockNameFromTuple, listOfSegmentUpdateDetailsArray, extension, + segment); + + } catch (Exception ex) { + String errorMsg = "Invalid tuple id " + tupleId; + LOG.error(errorMsg); + throw new Exception(errorMsg); + } + } + + /** + * This method returns the list of Blocks associated with the segment + * from the SegmentUpdateDetails List. + * @param segmentName + * @return + */ + public List<String> getBlockNameFromSegment(String segmentName) { + List<String> blockNames = new ArrayList<String>(); + for (SegmentUpdateDetails block : updateDetails) { + if (block.getSegmentName().equalsIgnoreCase(segmentName) && !CarbonUpdateUtil + .isBlockInvalid(block.getStatus())) { + blockNames.add(block.getBlockName()); + } + } + return blockNames; + } + + /** + * + * @param segName + * @param blockName + * @return + */ + public boolean isBlockValid(String segName, String blockName) { + + SegmentUpdateDetails details = getDetailsForABlock(segName, blockName); + + if (details == null || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) { + return true; + } + + return false; + } + /** + * Returns all delta file paths of specified block + * + * @param blockDir + * @param blockNameFromTuple + * @param listOfSegmentUpdateDetailsArray + * @param extension + * @return + */ + public List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple, + SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray, + final String extension, + String segment) { + List<String> deleteFileList = null; + for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) { + if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName() + .equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) { + final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block); + // If there is no delete delete file , then return null + if(deltaStartTimestamp == 0) { + return deleteFileList; + } + final long deltaEndTimeStamp = getEndTimeOfDeltaFile(extension, block); + + // final long deltaEndTimeStamp = block.getDeleteDeltaEndTimeAsLong(); + // final long deltaStartTimestamp = block.getDeleteDeltaStartTimeAsLong(); + return getFilePaths(blockDir, blockNameFromTuple, extension, deleteFileList, + deltaStartTimestamp, deltaEndTimeStamp); + } + } + return deleteFileList; + } + + private List<String> getFilePaths(CarbonFile blockDir, final String blockNameFromTuple, + final String extension, List<String> deleteFileList, final long deltaStartTimestamp, + final long deltaEndTimeStamp) { + CarbonFile[] files = blockDir.getParentFile().listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile pathName) { + String fileName = pathName.getName(); + if (fileName.endsWith(extension)) { + String firstPart = fileName.substring(0, fileName.indexOf('.')); + String blockName = + firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN)); + long timestamp = Long.parseLong(firstPart + .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + firstPart.length())); + if (blockNameFromTuple.equals(blockName) && (( + Long.compare(timestamp, deltaEndTimeStamp ) <= 0) && ( + Long.compare(timestamp, deltaStartTimestamp) >= 0))) { + return true; + } + } + return false; + } + }); + + for (CarbonFile cfile : files) { + if(null == deleteFileList) { + deleteFileList = new ArrayList<String>(files.length); + } + deleteFileList.add(cfile.getCanonicalPath()); + } + return deleteFileList; + } + + /** + * Return all delta file for a block. + * @param segmentId + * @param blockName + * @return + */ + public CarbonFile[] getDeleteDeltaFilesList(final String segmentId, final String blockName) { + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + + CarbonFile segDir = + FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); + + for (SegmentUpdateDetails block : updateDetails) { + if ((block.getBlockName().equalsIgnoreCase(blockName)) && + (block.getSegmentName().equalsIgnoreCase(segmentId)) + && !CarbonUpdateUtil.isBlockInvalid((block.getStatus()))) { + final long deltaStartTimestamp = + getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); + final long deltaEndTimeStamp = + getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); + CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile pathName) { + String fileName = pathName.getName(); + if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) { + String firstPart = fileName.substring(0, fileName.indexOf('.')); + String blkName = firstPart.substring(0, firstPart.lastIndexOf("-")); + long timestamp = Long.parseLong( + firstPart.substring(firstPart.lastIndexOf("-") + 1, firstPart.length())); + if (blockName.equals(blkName) && (Long.compare(timestamp, deltaEndTimeStamp) <= 0) + && (Long.compare(timestamp, deltaStartTimestamp) >= 0)) { + return true; + } + } + return false; + } + }); + + return files; + } + } + return null; + } + + /** + * Returns all update delta files of specified Segment. + * + * @param segmentId + * @param validUpdateFiles if true then only the valid range files will be returned. + * @return + */ + public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean validUpdateFiles, + final String fileExtension, final boolean excludeOriginalFact, + CarbonFile[] allFilesOfSegment) { + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + String endTimeStamp = ""; + String startTimeStamp = ""; + long factTimeStamp = 0; + + LoadMetadataDetails[] segmentDetails = + segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + + for (LoadMetadataDetails eachSeg : segmentDetails) { + if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) { + // if the segment is found then take the start and end time stamp. + startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp(); + endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp(); + factTimeStamp = eachSeg.getLoadStartTime(); + } + } + + // if start timestamp is empty then no update delta is found. so return empty list. + if (startTimeStamp.isEmpty()) { + return new CarbonFile[0]; + } + + final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp); + final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp); + final long factTimeStampFinal = factTimeStamp; + + List<CarbonFile> listOfCarbonFiles = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + // else scan the segment for the delta files with the respective timestamp. + + for (CarbonFile eachFile : allFilesOfSegment) { + + String fileName = eachFile.getName(); + if (fileName.endsWith(fileExtension)) { + String firstPart = fileName.substring(0, fileName.indexOf('.')); + + long timestamp = Long.parseLong(firstPart + .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + firstPart.length())); + + if (excludeOriginalFact) { + if (Long.compare(factTimeStampFinal, timestamp) == 0) { + continue; + } + } + + if (validUpdateFiles) { + if (Long.compare(timestamp, endTimeStampFinal) <= 0 + && Long.compare(timestamp, startTimeStampFinal) >= 0) { + listOfCarbonFiles.add(eachFile); + } + } else { + // invalid cases. + if (Long.compare(timestamp, startTimeStampFinal) < 0) { + listOfCarbonFiles.add(eachFile); + } + } + } + } + + return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]); + } + + /** + * Returns all update delta files of specified Segment. + * + * @param segmentId + * @param validUpdateFiles + * @param fileExtension + * @param excludeOriginalFact + * @param allFilesOfSegment + * @return + */ + public CarbonFile[] getUpdateDeltaFilesForSegment(String segmentId, + final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact, + CarbonFile[] allFilesOfSegment) { + + String endTimeStamp = ""; + String startTimeStamp = ""; + long factTimeStamp = 0; + + for (LoadMetadataDetails eachSeg : segmentDetails) { + if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) { + // if the segment is found then take the start and end time stamp. + startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp(); + endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp(); + factTimeStamp = eachSeg.getLoadStartTime(); + } + } + + // if start timestamp is empty then no update delta is found. so return empty list. + if (startTimeStamp.isEmpty()) { + return new CarbonFile[0]; + } + + final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp); + final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp); + final long factTimeStampFinal = factTimeStamp; + + List<CarbonFile> listOfCarbonFiles = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + // else scan the segment for the delta files with the respective timestamp. + + for (CarbonFile eachFile : allFilesOfSegment) { + + String fileName = eachFile.getName(); + if (fileName.endsWith(fileExtension)) { + String firstPart = fileName.substring(0, fileName.indexOf('.')); + + long timestamp = Long.parseLong(firstPart + .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + firstPart.length())); + + if (excludeOriginalFact) { + if (Long.compare(factTimeStampFinal, timestamp) == 0) { + continue; + } + } + + if (validUpdateFiles) { + if (Long.compare(timestamp, endTimeStampFinal) <= 0 + && Long.compare(timestamp, startTimeStampFinal) >= 0) { + + boolean validBlock = true; + + for(SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) { + if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName()) + && CarbonUpdateUtil.isBlockInvalid(blockDetails.getStatus())) { + validBlock = false; + } + } + + if (validBlock) { + listOfCarbonFiles.add(eachFile); + } + + } + } else { + // invalid cases. + if (Long.compare(timestamp, startTimeStampFinal) < 0) { + listOfCarbonFiles.add(eachFile); + } + } + } + } + + return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]); + } + + /** + * + * @param extension + * @param block + * @return + */ + private long getStartTimeOfDeltaFile(String extension, SegmentUpdateDetails block) { + long startTimestamp; + switch (extension) { + case CarbonCommonConstants.DELETE_DELTA_FILE_EXT: + startTimestamp = block.getDeleteDeltaStartTimeAsLong(); + break; + default: + startTimestamp = 0; + } + return startTimestamp; + } + + /** + * + * @param extension + * @param block + * @return + */ + private long getEndTimeOfDeltaFile(String extension, SegmentUpdateDetails block) { + long endTimestamp; + switch (extension) { + case CarbonCommonConstants.DELETE_DELTA_FILE_EXT: + endTimestamp = block.getDeleteDeltaEndTimeAsLong(); + break; + default: endTimestamp = 0; + } + return endTimestamp; + } + + + /** + * This method loads segment update details + * + * @return + */ + public SegmentUpdateDetails[] readLoadMetadata() { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray; + + // get the updated status file identifier from the table status. + String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier(); + + if(null == tableUpdateStatusIdentifier) { + return new SegmentUpdateDetails[0]; + } + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + String tableUpdateStatusPath = + carbonTablePath.getMetadataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR + + tableUpdateStatusIdentifier; + AtomicFileOperations fileOperation = new AtomicFileOperationsImpl(tableUpdateStatusPath, + FileFactory.getFileType(tableUpdateStatusPath)); + + try { + if (!FileFactory + .isFileExist(tableUpdateStatusPath, FileFactory.getFileType(tableUpdateStatusPath))) { + return new SegmentUpdateDetails[0]; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT); + buffReader = new BufferedReader(inStream); + listOfSegmentUpdateDetailsArray = + gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class); + } catch (IOException e) { + return new SegmentUpdateDetails[0]; + } finally { + closeStreams(buffReader, inStream, dataInputStream); + } + + return listOfSegmentUpdateDetailsArray; + } + + /** + * @return updateStatusFileName + */ + private String getUpdatedStatusIdentifier() { + SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + LoadMetadataDetails[] loadDetails = + ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + if (loadDetails.length == 0) { + return null; + } + return loadDetails[0].getUpdateStatusFileName(); + } + + /** + * writes segment update details into a given file at @param dataLoadLocation + * + * @param listOfSegmentUpdateDetailsArray + * @throws IOException + */ + public void writeLoadDetailsIntoFile(List<SegmentUpdateDetails> listOfSegmentUpdateDetailsArray, + String updateStatusFileIdentifier) throws IOException { + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + String fileLocation = + carbonTablePath.getMetadataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR + + CarbonUpdateUtil.getUpdateStatusFileName(updateStatusFileIdentifier); + + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(fileLocation, FileFactory.getFileType(fileLocation)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + // write the updated data into the metadata file. + + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT)); + + String metadataInstance = gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray); + brWriter.write(metadataInstance); + } catch (IOException ioe) { + LOG.error("Error message: " + ioe.getLocalizedMessage()); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + + } + + /** + * compares passed time stamp with status file delete timestamp and + * returns latest timestamp from status file if both are not equal + * returns null otherwise + * + * @param completeBlockName + * @param timestamp + * @return + */ + public String getTimestampForRefreshCache(String completeBlockName, String timestamp) { + long cacheTimestamp = 0; + if (null != timestamp) { + cacheTimestamp = CarbonUpdateUtil.getTimeStampAsLong(timestamp); + } + String blockName = CarbonTablePath.addDataPartPrefix(CarbonUpdateUtil.getBlockName( + CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.BLOCK_ID))); + String segmentId = + CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.SEGMENT_ID); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray = + readLoadMetadata(); + for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) { + if (segmentId.equalsIgnoreCase(block.getSegmentName()) && block.getBlockName() + .equalsIgnoreCase(blockName) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) { + long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong(); + if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) { + return null; + } else { + return block.getDeleteDeltaEndTimestamp(); + } + } + } + return null; + } + + /** + * This method closes the streams + * + * @param streams - streams to close. + */ + private void closeStreams(Closeable... streams) { + // Added if to avoid NullPointerException in case one stream is being passed as null + if (null != streams) { + for (Closeable stream : streams) { + if (null != stream) { + try { + stream.close(); + } catch (IOException e) { + LOG.error("Error while closing stream" + stream); + } + } + } + } + } + /** + * Get the invalid tasks in that segment. + * @param segmentId + * @return + */ + public List<String> getInvalidBlockList(String segmentId) { + + // get the original fact file timestamp from the table status file. + List<String> listOfInvalidBlocks = new ArrayList<String>(); + SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + LoadMetadataDetails[] segmentDetails = + ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + long timestampOfOriginalFacts = 0; + + String startTimestampOfUpdate = "" ; + String endTimestampOfUpdate = ""; + + for(LoadMetadataDetails segment : segmentDetails){ + // find matching segment and return timestamp. + if (segment.getLoadName().equalsIgnoreCase(segmentId)) { + timestampOfOriginalFacts = segment.getLoadStartTime(); + startTimestampOfUpdate = segment.getUpdateDeltaStartTimestamp(); + endTimestampOfUpdate = segment.getUpdateDeltaEndTimestamp(); + } + } + + if (startTimestampOfUpdate.isEmpty()) { + return listOfInvalidBlocks; + + } + + // now after getting the original fact timestamp, what ever is remaining + // files need to cross check it with table status file. + + // filter out the fact files. + + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + CarbonFile segDir = + FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); + + final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimestampOfUpdate); + final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimestampOfUpdate); + final Long timeStampOriginalFactFinal = + timestampOfOriginalFacts; + + CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile pathName) { + String fileName = pathName.getName(); + if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) { + String firstPart = fileName.substring(0, fileName.indexOf('.')); + + long timestamp = Long.parseLong(firstPart + .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + firstPart.length())); + if (Long.compare(timestamp, endTimeStampFinal) <= 0 + && Long.compare(timestamp, startTimeStampFinal) >= 0) { + return false; + } + if (Long.compare(timestamp, timeStampOriginalFactFinal) == 0) { + return false; + } + // take the rest of files as they are invalid. + return true; + } + return false; + } + }); + + // gather the task numbers. + for(CarbonFile updateFiles : files) { + listOfInvalidBlocks.add(updateFiles.getName()); + } + + return listOfInvalidBlocks; + } + /** + * Returns the invalid timestamp range of a segment. + * @param segmentId + * @return + */ + public UpdateVO getInvalidTimestampRange(String segmentId) { + UpdateVO range = new UpdateVO(); + for (LoadMetadataDetails segment : segmentDetails) { + if (segment.getLoadName().equalsIgnoreCase(segmentId)) { + range.setSegmentId(segmentId); + range.setFactTimestamp(segment.getLoadStartTime()); + if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment + .getUpdateDeltaEndTimestamp().isEmpty()) { + range.setUpdateDeltaStartTimestamp( + CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp())); + range.setLatestUpdateTimestamp( + CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp())); + } + } + } + return range; + } + /** + * + * @param segmentId + * @param block + * @param needCompleteList + * @return + */ + public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId, + final SegmentUpdateDetails block, final boolean needCompleteList, + CarbonFile[] allSegmentFiles) { + + final long deltaStartTimestamp = + getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); + + List<CarbonFile> files = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + for (CarbonFile eachFile : allSegmentFiles) { + String fileName = eachFile.getName(); + if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) { + String blkName = CarbonTablePath.DataFileUtil.getBlockNameFromDeleteDeltaFile(fileName); + + // complete list of delta files of that block is returned. + if (needCompleteList && block.getBlockName().equalsIgnoreCase(blkName)) { + files.add(eachFile); + } + + // invalid delete delta files only will be returned. + long timestamp = CarbonUpdateUtil.getTimeStampAsLong( + CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName)); + + if (block.getBlockName().equalsIgnoreCase(blkName) && ( + Long.compare(timestamp, deltaStartTimestamp) < 0)) { + files.add(eachFile); + } + } + } + + return files.toArray(new CarbonFile[files.size()]); + } + + /** + * + * @param blockName + * @param allSegmentFiles + * @return + */ + public CarbonFile[] getAllBlockRelatedFiles(String blockName, CarbonFile[] allSegmentFiles, + String actualBlockName) { + List<CarbonFile> files = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + for (CarbonFile eachFile : allSegmentFiles) { + + // for carbon data. + if (eachFile.getName().equalsIgnoreCase(actualBlockName)) { + files.add(eachFile); + } + + // get carbon index files of the block. + String taskNum = CarbonTablePath.DataFileUtil.getTaskNo(actualBlockName); + // String indexFileEndsWith = timestamp + CarbonTablePath.getCarbonIndexExtension(); + if (eachFile.getName().endsWith(CarbonTablePath.getCarbonIndexExtension()) && eachFile + .getName().startsWith(taskNum)) { + files.add(eachFile); + } + + } + + return files.toArray(new CarbonFile[files.size()]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java b/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java deleted file mode 100644 index 49a9dc4..0000000 --- a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.unsafe; - -import java.lang.reflect.Field; - -import sun.misc.Unsafe; - - -public final class CarbonUnsafe { - - public static final int BYTE_ARRAY_OFFSET; - - public static final int SHORT_ARRAY_OFFSET; - - public static final int INT_ARRAY_OFFSET; - - public static final int LONG_ARRAY_OFFSET; - - public static final int DOUBLE_ARRAY_OFFSET; - - public static Unsafe unsafe; - - static { - try { - Field cause = Unsafe.class.getDeclaredField("theUnsafe"); - cause.setAccessible(true); - unsafe = (Unsafe) cause.get(null); - } catch (Throwable var2) { - unsafe = null; - } - if (unsafe != null) { - BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class); - SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class); - INT_ARRAY_OFFSET = unsafe.arrayBaseOffset(int[].class); - LONG_ARRAY_OFFSET = unsafe.arrayBaseOffset(long[].class); - DOUBLE_ARRAY_OFFSET = unsafe.arrayBaseOffset(double[].class); - } else { - BYTE_ARRAY_OFFSET = 0; - SHORT_ARRAY_OFFSET = 0; - INT_ARRAY_OFFSET = 0; - LONG_ARRAY_OFFSET = 0; - DOUBLE_ARRAY_OFFSET = 0; - } - } -}