Repository: kylin
Updated Branches:
  refs/heads/yang21 80018874c -> d3ecb0d9c


refine mapper and reducer log


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3ecb0d9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3ecb0d9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3ecb0d9

Branch: refs/heads/yang21
Commit: d3ecb0d9c381dbb035c7cada7d3c798e24fef1d1
Parents: 8001887
Author: Hongbin Ma <mahong...@apache.org>
Authored: Thu Dec 1 18:01:55 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Thu Dec 1 18:01:55 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/KylinMapper.java | 17 +++++++---
 .../apache/kylin/engine/mr/KylinReducer.java    | 17 +++++++---
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  1 -
 .../kylin/engine/mr/steps/CuboidReducer.java    | 21 ++++++------
 .../engine/mr/steps/HiveToBaseCuboidMapper.java | 10 ++----
 .../engine/mr/steps/InMemCuboidMapper.java      | 34 ++++++++------------
 .../engine/mr/steps/InMemCuboidReducer.java     | 20 ++++++------
 .../kylin/engine/mr/steps/NDCuboidMapper.java   | 22 ++++++-------
 8 files changed, 70 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index a01f7a2..c5af2fe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -18,18 +18,21 @@
 
 package org.apache.kylin.engine.mr;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  */
 public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends 
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = 
LoggerFactory.getLogger(KylinMapper.class);
 
+    protected int mapCounter = 0;
+
     protected void bindCurrentConfiguration(Configuration conf) {
         logger.info("The conf for current mapper will be " + 
System.identityHashCode(conf));
         HadoopUtil.setCurrentConfiguration(conf);
@@ -38,6 +41,10 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Mapper<KEYIN,
     @Override
     final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, 
KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
         try {
+            if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 
0) {
+                logger.info("Accepting Mapper Key with ordinal: " + 
mapCounter);
+            }
+
             doMap(key, value, context);
         } catch (IOException ex) { // KYLIN-2170
             logger.error("", ex);
@@ -53,11 +60,11 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Mapper<KEYIN,
             throw ex;
         }
     }
-    
+
     protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, 
KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
         super.map(key, value, context);
     }
-    
+
     @Override
     final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT>.Context context) throws IOException, InterruptedException {
         try {
@@ -76,7 +83,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Mapper<KEYIN,
             throw ex;
         }
     }
-    
+
     protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 
context) throws IOException, InterruptedException {
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 2b63ce0..83266ea 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -18,18 +18,22 @@
 
 package org.apache.kylin.engine.mr;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  */
 public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = 
LoggerFactory.getLogger(KylinReducer.class);
-    
+
+    protected int reduceCounter = 0;
+
+
     protected void bindCurrentConfiguration(Configuration conf) {
         HadoopUtil.setCurrentConfiguration(conf);
     }
@@ -37,6 +41,9 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Reducer<KEYI
     @Override
     final public void reduce(KEYIN key, Iterable<VALUEIN> values, 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, 
InterruptedException {
         try {
+            if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD 
== 0) {
+                logger.info("Accepting Reducer Key with ordinal: " + 
reduceCounter);
+            }
             doReduce(key, values, context);
         } catch (IOException ex) { // KYLIN-2170
             logger.error("", ex);
@@ -52,11 +59,11 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Reducer<KEYI
             throw ex;
         }
     }
-    
+
     protected void doReduce(KEYIN key, Iterable<VALUEIN> values, 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, 
InterruptedException {
         super.reduce(key, values, context);
     }
-    
+
     @Override
     final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT>.Context context) throws IOException, InterruptedException {
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index dd0a031..2ad5f53 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -69,7 +69,6 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> 
extends KylinMapper<K
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected String intermediateTableRowDelimiter;
     protected byte byteRowDelimiter;
-    protected int counter;
     protected MeasureIngester<?>[] aggrIngesters;
     protected Map<TblColRef, Dictionary<String>> dictionaryMap;
     protected Object[] measures;

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 9543f0a..03c925e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -18,10 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeManager;
@@ -35,9 +31,12 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
 /**
  * @author George Song (ysong1)
- * 
  */
 public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
 
@@ -50,7 +49,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
     private BufferedMeasureCodec codec;
     private MeasureAggregators aggs;
 
-    private int counter;
+    private int vcounter = 0;
     private int cuboidLevel;
     private boolean[] needAggr;
     private Object[] input;
@@ -90,12 +89,18 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         aggs.reset();
 
         for (Text value : values) {
+
+            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+                logger.info("Handling value with ordinal: " + vcounter + "!");
+            }
+
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength()), input);
             if (cuboidLevel > 0) {
                 aggs.aggregate(input, needAggr);
             } else {
                 aggs.aggregate(input);
             }
+
         }
         aggs.collectStates(result);
 
@@ -104,10 +109,6 @@ public class CuboidReducer extends KylinReducer<Text, 
Text, Text, Text> {
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
         context.write(key, outputValue);
 
-        counter++;
-        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index d9c5312..f4e8af7 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -18,11 +18,10 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import java.io.IOException;
 
 /**
  * @author George Song (ysong1)
@@ -39,11 +38,6 @@ public class HiveToBaseCuboidMapper<KEYIN> extends 
BaseCuboidMapperBase<KEYIN, O
 
     @Override
     public void doMap(KEYIN key, Object value, Context context) throws 
IOException, InterruptedException {
-        counter++;
-        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
         try {
             //put a record into the shared bytesSplitter
             String[] row = flatTableInputFormat.parseMapperInput(value);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 15bfd2e..cf5abaf 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -18,18 +18,7 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
@@ -51,7 +40,17 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -64,7 +63,6 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
     private CubeSegment cubeSegment;
     private IMRTableInputFormat flatTableInputFormat;
 
-    private int counter;
     private BlockingQueue<List<String>> queue = new 
ArrayBlockingQueue<List<String>>(64);
     private Future<?> future;
 
@@ -120,10 +118,6 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
 
         while (!future.isDone()) {
             if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
-                counter++;
-                if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) 
{
-                    logger.info("Handled " + counter + " records!");
-                }
                 break;
             }
         }
@@ -131,10 +125,10 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
 
     @Override
     protected void doCleanup(Context context) throws IOException, 
InterruptedException {
-        logger.info("Totally handled " + counter + " records!");
+        logger.info("Totally handled " + mapCounter + " records!");
 
         while (!future.isDone()) {
-            if (queue.offer(Collections.<String> emptyList(), 1, 
TimeUnit.SECONDS)) {
+            if (queue.offer(Collections.<String>emptyList(), 1, 
TimeUnit.SECONDS)) {
                 break;
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index d0a7062..a57ddb8 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -18,10 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -37,6 +33,10 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
 /**
  */
 public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, 
ByteArrayWritable, Object, Object> {
@@ -46,7 +46,7 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
     private BufferedMeasureCodec codec;
     private MeasureAggregators aggs;
 
-    private int counter;
+    private int vcounter;
     private Object[] input;
     private Object[] result;
 
@@ -74,10 +74,14 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
 
     @Override
     public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> 
values, Context context) throws IOException, InterruptedException {
-
         aggs.reset();
 
         for (ByteArrayWritable value : values) {
+
+            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+                logger.info("Handling value with ordinal: " + vcounter);
+            }
+
             codec.decode(value.asBuffer(), input);
             aggs.aggregate(input);
         }
@@ -92,10 +96,6 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
 
         context.write(outputKey, outputValue);
 
-        counter++;
-        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 8107e52..54d9e23 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -18,9 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.util.Collection;
-
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
@@ -41,9 +38,11 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collection;
+
 /**
  * @author George Song (ysong1)
- * 
  */
 public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
@@ -97,9 +96,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, 
Text, Text> {
         int index = rowKeySplitter.getBodySplitOffset(); // skip shard and 
cuboidId
         for (int i = 0; i < parentCuboidIdActualLength; i++) {
             if ((mask & parentCuboidId) > 0) {// if the this bit position 
equals
-                                                  // 1
+                // 1
                 if ((mask & childCuboidId) > 0) {// if the child cuboid has 
this
-                                                     // column
+                    // column
                     System.arraycopy(splitBuffers[index].value, 0, 
newKeyBodyBuf, offset, splitBuffers[index].length);
                     offset += splitBuffers[index].length;
                 }
@@ -123,24 +122,21 @@ public class NDCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
     public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
         long cuboidId = rowKeySplitter.split(key.getBytes());
         Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
-
         Collection<Long> myChildren = 
cuboidScheduler.getSpanningCuboid(cuboidId);
 
         // if still empty or null
         if (myChildren == null || myChildren.size() == 0) {
             context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, 
"Skipped records").increment(1L);
-            skipCounter++;
-            if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) 
{
-                logger.info("Skipped " + skipCounter + " records!");
+            if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 
0) {
+                logger.info("Skipping record with ordinal " + skipCounter);
             }
             return;
         }
 
         context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, 
"Processed records").increment(1L);
 
-        handleCounter++;
-        if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + handleCounter + " records!");
+        if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) 
{
+            logger.info("Handling record with ordinal: " + handleCounter);
         }
 
         for (Long child : myChildren) {

Reply via email to