Repository: kylin Updated Branches: refs/heads/yang21-hbase1.x 0d3af7e00 -> 6663adeec (forced update)
KYLIN-2170 refactor doMap/doReduce/doCleanup Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1e24228e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1e24228e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1e24228e Branch: refs/heads/yang21-hbase1.x Commit: 1e24228ea9e4a070923a854e6a50fbe561ccc378 Parents: 3b07c26 Author: Li Yang <liy...@apache.org> Authored: Wed Nov 9 16:11:24 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Nov 9 16:11:24 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin_job_conf_inmem.xml | 2 +- .../org/apache/kylin/engine/mr/KylinMapper.java | 47 +++++++++++++- .../apache/kylin/engine/mr/KylinReducer.java | 51 ++++++++++++++- .../engine/mr/steps/BaseCuboidMapperBase.java | 2 +- .../kylin/engine/mr/steps/CuboidReducer.java | 2 +- .../mr/steps/FactDistinctColumnPartitioner.java | 1 + .../mr/steps/FactDistinctColumnsCombiner.java | 2 +- .../mr/steps/FactDistinctColumnsMapperBase.java | 2 +- .../mr/steps/FactDistinctColumnsReducer.java | 42 ++++++------ .../mr/steps/FactDistinctHiveColumnsMapper.java | 38 +++++------ .../engine/mr/steps/HiveToBaseCuboidMapper.java | 2 +- .../engine/mr/steps/InMemCuboidMapper.java | 28 ++++---- .../engine/mr/steps/InMemCuboidReducer.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../kylin/engine/mr/steps/NDCuboidMapper.java | 2 +- .../steps/RowKeyDistributionCheckerMapper.java | 16 ++--- .../steps/RowKeyDistributionCheckerReducer.java | 2 +- .../cardinality/ColumnCardinalityMapper.java | 26 ++++---- .../cardinality/ColumnCardinalityReducer.java | 40 +++++------- .../storage/hbase/steps/CubeHFileMapper.java | 2 +- .../hbase/steps/RangeKeyDistributionMapper.java | 14 ++-- .../steps/RangeKeyDistributionReducer.java | 68 +++++++++----------- .../storage/hbase/steps/RowValueDecoder.java | 4 -- 23 files changed, 227 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/build/conf/kylin_job_conf_inmem.xml ---------------------------------------------------------------------- diff --git a/build/conf/kylin_job_conf_inmem.xml b/build/conf/kylin_job_conf_inmem.xml index d6799d5..7e6dc08 100644 --- a/build/conf/kylin_job_conf_inmem.xml +++ b/build/conf/kylin_job_conf_inmem.xml @@ -94,7 +94,7 @@ <property> <name>mapreduce.map.java.opts</name> - <value>-Xmx2700m</value> + <value>-Xmx2700m -XX:OnOutOfMemoryError="kill -9 %p"</value> <description></description> </property> http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java index 29c6844..a527b3d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java @@ -18,6 +18,8 @@ package org.apache.kylin.engine.mr; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; @@ -25,11 +27,54 @@ import org.slf4j.LoggerFactory; /** */ -public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { +abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class); protected void bindCurrentConfiguration(Configuration conf) { logger.info("The conf for current mapper will be " + System.identityHashCode(conf)); HadoopUtil.setCurrentConfiguration(conf); } + + @Override + final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + try { + doMap(key, value, context); + } catch (IOException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (InterruptedException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (RuntimeException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (Error ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } + } + + abstract protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException; + + @Override + final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + try { + doCleanup(context); + } catch (IOException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (InterruptedException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (RuntimeException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (Error ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } + } + + protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java index 846c849..2987032 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java @@ -18,13 +18,62 @@ package org.apache.kylin.engine.mr; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Reducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ -public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { +abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { + private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class); + protected void bindCurrentConfiguration(Configuration conf) { HadoopUtil.setCurrentConfiguration(conf); } + + @Override + final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + try { + doReduce(key, values, context); + } catch (IOException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (InterruptedException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (RuntimeException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (Error ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } + } + + abstract protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException; + + @Override + final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + try { + doCleanup(context); + } catch (IOException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (InterruptedException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (RuntimeException ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } catch (Error ex) { // KYLIN-2170 + logger.error("", ex); + throw ex; + } + } + + protected void doCleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 4f0d3fd..0649a0c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -55,7 +55,7 @@ import com.google.common.collect.Lists; /** */ -public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { +abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class); public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); public static final byte[] ONE = Bytes.toBytes("1"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index d6e1d7e..9543f0a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -86,7 +86,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { } @Override - public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { aggs.reset(); for (Text value : values) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index a631cf4..7801563 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -26,6 +26,7 @@ import org.apache.kylin.common.util.BytesUtil; /** */ public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> { + @SuppressWarnings("unused") private Configuration conf; @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java index 1821828..2dda047 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java @@ -34,7 +34,7 @@ public class FactDistinctColumnsCombiner extends KylinReducer<Text, Text, Text, } @Override - public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // for hll, each key only has one output, no need to do local combine; // for normal col, values are empty text http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 3fa966d..cb30b94 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -41,7 +41,7 @@ import org.apache.kylin.metadata.model.TblColRef; /** */ -public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { +abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { protected String cubeName; protected CubeInstance cube; http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index a9c5d4b..6f0bf4c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -101,7 +101,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri } @Override - public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { if (isStatistics == false) { colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); @@ -161,29 +161,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - if (isStatistics == false) { - if (!outputTouched || colValues.size() > 0) { - outputDistinctValues(col, colValues, context); - colValues.clear(); - } - } else { - //output the hll info; - long grandTotal = 0; - for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { - grandTotal += hll.getCountEstimate(); - } - double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - - int mapperNumber = baseCuboidRowCountInMappers.size(); - - writeMapperAndCuboidStatistics(context); // for human check - CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // - cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); + protected void doCleanup(Context context) throws IOException, InterruptedException { + if (isStatistics == false) { + if (!outputTouched || colValues.size() > 0) { + outputDistinctValues(col, colValues, context); + colValues.clear(); } - } catch (Throwable ex) { - logger.error("", ex); + } else { + //output the hll info; + long grandTotal = 0; + for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { + grandTotal += hll.getCountEstimate(); + } + double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + + int mapperNumber = baseCuboidRowCountInMappers.size(); + + writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // + cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 2154bc6..061fc80 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -106,7 +106,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap } @Override - public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { String[] row = flatTableInputFormat.parseMapperInput(record); try { for (int i = 0; i < factDictCols.size(); i++) { @@ -157,27 +157,23 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - if (collectStatistics) { - ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - // output each cuboid's hll to reducer, key is 0 - cuboidId - HyperLogLogPlusCounter hll; - for (int i = 0; i < cuboidIds.length; i++) { - hll = allCuboidsHLL[i]; - - keyBuffer.clear(); - keyBuffer.put(MARK_FOR_HLL); // one byte - keyBuffer.putLong(cuboidIds[i]); - outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); - hllBuf.clear(); - hll.writeRegisters(hllBuf); - outputValue.set(hllBuf.array(), 0, hllBuf.position()); - context.write(outputKey, outputValue); - } + protected void doCleanup(Context context) throws IOException, InterruptedException { + if (collectStatistics) { + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + // output each cuboid's hll to reducer, key is 0 - cuboidId + HyperLogLogPlusCounter hll; + for (int i = 0; i < cuboidIds.length; i++) { + hll = allCuboidsHLL[i]; + + keyBuffer.clear(); + keyBuffer.put(MARK_FOR_HLL); // one byte + keyBuffer.putLong(cuboidIds[i]); + outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + context.write(outputKey, outputValue); } - } catch (Throwable ex) { - ex.printStackTrace(); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index 83926cc..d9c5312 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -38,7 +38,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O } @Override - public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException { + public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException { counter++; if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + counter + " records!"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index dac93cb..15bfd2e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -113,7 +113,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr } @Override - public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { // put each row to the queue String[] row = flatTableInputFormat.parseMapperInput(record); List<String> rowAsList = Arrays.asList(row); @@ -130,25 +130,21 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { + protected void doCleanup(Context context) throws IOException, InterruptedException { logger.info("Totally handled " + counter + " records!"); - try { - while (!future.isDone()) { - if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { - break; - } - } - - try { - future.get(); - } catch (Exception e) { - throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); + while (!future.isDone()) { + if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { + break; } - queue.clear(); - } catch (Throwable ex) { - logger.error("", ex); } + + try { + future.get(); + } catch (Exception e) { + throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); + } + queue.clear(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index cfecf23..d0a7062 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -73,7 +73,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra } @Override - public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException { + public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException { aggs.reset(); http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 55b8474..67c0f4c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -169,7 +169,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { } @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidID = rowKeySplitter.split(key.getBytes()); Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index b566c2e..8107e52 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -120,7 +120,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { } @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidId = rowKeySplitter.split(key.getBytes()); Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java index 21e97a3..fca91a6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java @@ -63,7 +63,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex } @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { for (Text t : keyList) { if (key.compareTo(t) < 0) { Long v = resultMap.get(t); @@ -76,15 +76,11 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - LongWritable outputValue = new LongWritable(); - for (Entry<Text, Long> kv : resultMap.entrySet()) { - outputValue.set(kv.getValue()); - context.write(kv.getKey(), outputValue); - } - } catch (Throwable ex) { - ex.printStackTrace(); + protected void doCleanup(Context context) throws IOException, InterruptedException { + LongWritable outputValue = new LongWritable(); + for (Entry<Text, Long> kv : resultMap.entrySet()) { + outputValue.set(kv.getValue()); + context.write(kv.getKey(), outputValue); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java index 332cba5..d203e8c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java @@ -38,7 +38,7 @@ public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWri } @Override - public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long length = 0; for (LongWritable v : values) { http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 8c624e3..06a07ca 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -66,7 +66,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab } @Override - public void map(T key, Object value, Context context) throws IOException, InterruptedException { + public void doMap(T key, Object value, Context context) throws IOException, InterruptedException { ColumnDesc[] columns = tableDesc.getColumns(); String[] values = tableInputFormat.parseMapperInput(value); @@ -95,20 +95,16 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - Iterator<Integer> it = hllcMap.keySet().iterator(); - ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - while (it.hasNext()) { - int key = it.next(); - HyperLogLogPlusCounter hllc = hllcMap.get(key); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit())); - } - } catch (Throwable ex) { - ex.printStackTrace(); + protected void doCleanup(Context context) throws IOException, InterruptedException { + Iterator<Integer> it = hllcMap.keySet().iterator(); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + while (it.hasNext()) { + int key = it.next(); + HyperLogLogPlusCounter hllc = hllcMap.get(key); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit())); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java index 2551af3..ea66999 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java @@ -49,7 +49,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri } @Override - public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { + public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { int skey = key.get(); for (BytesWritable v : values) { ByteBuffer buffer = ByteBuffer.wrap(v.getBytes()); @@ -68,28 +68,22 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - List<Integer> keys = new ArrayList<Integer>(); - Iterator<Integer> it = hllcMap.keySet().iterator(); - while (it.hasNext()) { - keys.add(it.next()); - } - Collections.sort(keys); - it = keys.iterator(); - while (it.hasNext()) { - int key = it.next(); - HyperLogLogPlusCounter hllc = hllcMap.get(key); - ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate())); - // context.write(new Text("ErrorRate_" + key), new - // LongWritable((long)hllc.getErrorRate())); - } - } catch (Throwable ex) { - ex.printStackTrace(); + protected void doCleanup(Context context) throws IOException, InterruptedException { + List<Integer> keys = new ArrayList<Integer>(); + Iterator<Integer> it = hllcMap.keySet().iterator(); + while (it.hasNext()) { + keys.add(it.next()); + } + Collections.sort(keys); + it = keys.iterator(); + while (it.hasNext()) { + int key = it.next(); + HyperLogLogPlusCounter hllc = hllcMap.get(key); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate())); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java index 8205ff7..371a83b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java @@ -74,7 +74,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita } @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { outputKey.set(key.getBytes(), 0, key.getLength()); KeyValue outputValue; http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java index c82d58d..c66ccb3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java @@ -44,7 +44,7 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo } @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { lastKey = key; int bytesLength = key.getLength() + value.getLength(); @@ -61,14 +61,10 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - if (lastKey != null) { - outputValue.set(bytesRead); - context.write(lastKey, outputValue); - } - } catch (Throwable ex) { - ex.printStackTrace(); + protected void doCleanup(Context context) throws IOException, InterruptedException { + if (lastKey != null) { + outputValue.set(bytesRead); + context.write(lastKey, outputValue); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java index e9918d4..63433dd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java @@ -84,7 +84,7 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable } @Override - public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { for (LongWritable v : values) { bytesRead += v.get(); } @@ -96,42 +96,38 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - int nRegion = Math.round((float) gbPoints.size() / cut); - nRegion = Math.max(minRegionCount, nRegion); - nRegion = Math.min(maxRegionCount, nRegion); - - int gbPerRegion = gbPoints.size() / nRegion; - gbPerRegion = Math.max(1, gbPerRegion); - - if (hfileSizeGB <= 0) { - hfileSizeGB = gbPerRegion; - } - int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB); - hfilePerRegion = Math.max(1, hfilePerRegion); - - System.out.println(nRegion + " regions"); - System.out.println(gbPerRegion + " GB per region"); - System.out.println(hfilePerRegion + " hfile per region"); - - Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); - SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class); - int hfileCountInOneRegion = 0; - for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { - hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); - if (++hfileCountInOneRegion >= hfilePerRegion) { - Text key = gbPoints.get(i); - outputValue.set(i); - System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); - context.write(key, outputValue); - - hfileCountInOneRegion = 0; - } + protected void doCleanup(Context context) throws IOException, InterruptedException { + int nRegion = Math.round((float) gbPoints.size() / cut); + nRegion = Math.max(minRegionCount, nRegion); + nRegion = Math.min(maxRegionCount, nRegion); + + int gbPerRegion = gbPoints.size() / nRegion; + gbPerRegion = Math.max(1, gbPerRegion); + + if (hfileSizeGB <= 0) { + hfileSizeGB = gbPerRegion; + } + int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB); + hfilePerRegion = Math.max(1, hfilePerRegion); + + System.out.println(nRegion + " regions"); + System.out.println(gbPerRegion + " GB per region"); + System.out.println(hfilePerRegion + " hfile per region"); + + Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); + SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class); + int hfileCountInOneRegion = 0; + for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { + hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); + if (++hfileCountInOneRegion >= hfilePerRegion) { + Text key = gbPoints.get(i); + outputValue.set(i); + System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); + context.write(key, outputValue); + + hfileCountInOneRegion = 0; } - hfilePartitionWriter.close(); - } catch (Throwable ex) { - logger.error("", ex); } + hfilePartitionWriter.close(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java index 86104e2..b02183a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java @@ -31,15 +31,11 @@ import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.storage.hbase.util.Results; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** */ public class RowValueDecoder implements Cloneable { - private static final Logger logger = LoggerFactory.getLogger(RowValueDecoder.class); - private final HBaseColumnDesc hbaseColumn; private final byte[] hbaseColumnFamily; private final byte[] hbaseColumnQualifier;