Repository: incubator-kylin Updated Branches: refs/heads/0.8 05947f31e -> ce4511b7a
bug fix: cube_statistics file ts not right Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ce4511b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ce4511b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ce4511b7 Branch: refs/heads/0.8 Commit: ce4511b7a78b95801fb729180a13691f553d8837 Parents: 5c33d5a Author: honma <ho...@ebay.com> Authored: Wed Aug 12 10:18:02 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Wed Aug 12 10:18:18 2015 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/CubeStreamConsumer.java | 28 +++++---- .../storage/hbase/HBaseStreamingOutput.java | 60 +++++++------------- 2 files changed, 40 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce4511b7/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java index 5670224..4a909ac 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ package org.apache.kylin.job.streaming; import java.io.IOException; @@ -9,7 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -37,20 +53,12 @@ import org.apache.kylin.source.ReadableTable.TableSignature; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.HBaseCuboidWriter; import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; -import org.apache.kylin.storage.hbase.steps.InMemKeyValueCreator; import org.apache.kylin.streaming.MicroStreamBatch; import org.apache.kylin.streaming.MicroStreamBatchConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; /** */ @@ -96,7 +104,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer { FileSystem.getLocal(conf).deleteOnExit(outputPath); FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100); FSDataInputStream localStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)); - ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), localStream, 0); + ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), localStream, System.currentTimeMillis()); localStream.close(); final Map<TblColRef, Dictionary<?>> dictionaryMap = CubingUtils.buildDictionary(cubeInstance, parsedStreamMessages); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce4511b7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStreamingOutput.java index 10291fa..7d10b3d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStreamingOutput.java @@ -1,38 +1,26 @@ /* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ package org.apache.kylin.storage.hbase; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -53,14 +41,10 @@ import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; -import java.util.UUID; - /** */ public class HBaseStreamingOutput implements IStreamingOutput { - + private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class); @Override @@ -87,7 +71,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { FSDataInputStream inputStream = null; try { inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)); - ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, 0); + ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis()); } finally { IOUtils.closeQuietly(inputStream); }