KYLIN-1726 fix 'FileSystem Closed' error
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/afdec89f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/afdec89f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/afdec89f Branch: refs/heads/orderedbytes Commit: afdec89fe09dcb28b368775f8b830c78f74e7489 Parents: b0aa327 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Oct 9 19:06:07 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Sun Oct 9 19:08:41 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/kafka/UpdateTimeRangeStep.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/afdec89f/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java index bb64bf9..9e902d8 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java @@ -21,6 +21,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.fs.FSDataInputStream; @@ -63,7 +64,12 @@ public class UpdateTimeRangeStep extends AbstractExecutable { final Path outputFile = new Path(outputPath, partitionCol.getName()); String minValue = null, maxValue = null, currentValue = null; - try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + FSDataInputStream inputStream = null; + BufferedReader bufferedReader = null; + try { + FileSystem fs = HadoopUtil.getFileSystem(outputPath); + inputStream = fs.open(outputFile); + bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); minValue = currentValue = bufferedReader.readLine(); while (currentValue != null) { maxValue = currentValue; @@ -72,6 +78,9 @@ public class UpdateTimeRangeStep extends AbstractExecutable { } catch (IOException e) { logger.error("fail to read file " + outputFile, e); return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } finally { + IOUtils.closeQuietly(bufferedReader); + IOUtils.closeQuietly(inputStream); } final DataType partitionColType = partitionCol.getType();