This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 3.0.x in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/3.0.x by this push: new f81c2f9 KYLIN-4396 Close FileReader in SaveDictStep f81c2f9 is described below commit f81c2f967e978d22a9a1ea08630ed9d21f06d56f Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Wed Mar 4 09:51:57 2020 +0800 KYLIN-4396 Close FileReader in SaveDictStep --- .../mr/streaming/ColumnarSplitDictReader.java | 2 +- .../kylin/engine/mr/streaming/DictsReader.java | 2 + .../kylin/engine/mr/streaming/SaveDictStep.java | 46 +++++++++++----------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java index 5b46270..de17b23 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnarSplitDictReader.java @@ -68,7 +68,7 @@ public class ColumnarSplitDictReader extends ColumnarSplitReader { itr = set.iterator(); readCount = new AtomicInteger(0); - logger.debug("Reader for dictinary reader initialized. "); + logger.info("Reader for dictionary reader initialized. "); } @Override diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java index e10a012..ecbe8cc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/DictsReader.java @@ -51,10 +51,12 @@ public class DictsReader extends ColumnarFilesReader { dataInputStream = fs.open(dataFilePath); Dictionary dict; String colName; + logger.info("Reading dictionary from {}", dataFilePath.getName()); for (DimDictionaryMetaInfo dimDictMetaInfo : dimDictMetaInfos) { dataInputStream.seek(dimDictMetaInfo.getStartOffset()); dict = DictionarySerializer.deserialize(dataInputStream); colName = dimDictMetaInfo.getDimName(); + logger.info("Add dict for {}", colName); builder.put(colName, dict); } return builder.build(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java index fdd69ce..ae2fbe4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/SaveDictStep.java @@ -108,32 +108,32 @@ public class SaveDictStep extends AbstractExecutable { } }); - SequenceFile.Reader reader; for (FileStatus file : files) { - reader = new SequenceFile.Reader(fs, file.getPath(), conf); - Text colName = new Text(); - Text dictInfo = new Text(); - while (reader.next(colName, dictInfo)) { - TblColRef colRef = colRefMap.get(colName.toString()); - if (colRef == null) { - throw new IllegalArgumentException("Invalid column name " + colName - + " or it need not build dictionary!"); - } - DictionaryInfo dictionaryInfo = serializer.deserialize(new DataInputStream( - new ByteArrayInputStream(dictInfo.getBytes()))); - - Dictionary dict = dictionaryInfo.getDictionaryObject(); - if (dict != null) { - dictionaryInfo = dictManager.trySaveNewDict(dict, dictionaryInfo); - cubeSeg.putDictResPath(colRef, dictionaryInfo.getResourcePath()); - if (cubeSeg.getRowkeyStats() != null) { - cubeSeg.getRowkeyStats().add( - new Object[] { colRef.getName(), dict.getSize(), dict.getSizeOfId() }); + try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf)) { + Text colName = new Text(); + Text dictInfo = new Text(); + while (reader.next(colName, dictInfo)) { + TblColRef colRef = colRefMap.get(colName.toString()); + if (colRef == null) { + throw new IllegalArgumentException("Invalid column name " + colName + + " or it need not build dictionary!"); + } + DictionaryInfo dictionaryInfo = serializer.deserialize(new DataInputStream( + new ByteArrayInputStream(dictInfo.getBytes()))); + + Dictionary dict = dictionaryInfo.getDictionaryObject(); + if (dict != null) { + dictionaryInfo = dictManager.trySaveNewDict(dict, dictionaryInfo); + cubeSeg.putDictResPath(colRef, dictionaryInfo.getResourcePath()); + if (cubeSeg.getRowkeyStats() != null) { + cubeSeg.getRowkeyStats().add( + new Object[]{colRef.getName(), dict.getSize(), dict.getSizeOfId()}); + } else { + logger.error("rowkey_stats field not found!"); + } } else { - logger.error("rowkey_stats field not found!"); + logger.error("dictionary of column {} not found! ", colRef.getName()); } - } else { - logger.error("dictionary of column {} not found! ", colRef.getName()); } } }