This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new b6cf305  KYLIN-4090 Use mapPartition operator to optimize build cube 
step
b6cf305 is described below

commit b6cf305c7eeee1ed8c7c16004b89323115a2c5fb
Author: yanghua <yanghua1...@gmail.com>
AuthorDate: Wed Jul 17 19:55:37 2019 +0800

    KYLIN-4090 Use mapPartition operator to optimize build cube step
---
 .../kylin/engine/flink/FlinkCubingByLayer.java     | 115 ++++++++++++++++++++-
 1 file changed, 111 insertions(+), 4 deletions(-)

diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 9e6f86f..b39aa1e 100644
--- 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.Options;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -173,8 +174,8 @@ public class FlinkCubingByLayer extends AbstractApplication 
implements Serializa
 
         DataSet<String[]> hiveDataSet = 
FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
 
-        DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet = 
hiveDataSet.map(
-                new EncodeBaseCuboidMapFunction(cubeName, segmentId, metaUrl, 
sConf));
+        DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet = 
hiveDataSet.mapPartition(
+                new EncodeBaseCuboidMapPartitionFunction(cubeName, segmentId, 
metaUrl, sConf));
 
         Long totalCount = 0L;
         if (envConfig.isFlinkSanityCheckEnabled()) {
@@ -197,10 +198,10 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
 
         sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 
0, Job.getInstance(), envConfig);
 
-        CuboidFlatMapFunction flatMapFunction = new 
CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
+        CuboidMapPartitionFunction mapPartitionFunction = new 
CuboidMapPartitionFunction(cubeName, segmentId, metaUrl, sConf);
 
         for (level = 1; level <= totalLevels; level++) {
-            allDataSets[level] = allDataSets[level - 
1].flatMap(flatMapFunction).groupBy(0).reduceGroup(reducerFunction);
+            allDataSets[level] = allDataSets[level - 
1].mapPartition(mapPartitionFunction).groupBy(0).reduceGroup(reducerFunction);
             if (envConfig.isFlinkSanityCheckEnabled()) {
                 sanityCheck(allDataSets[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
@@ -322,6 +323,53 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
         }
     }
 
+    /**
+     * A map partition function used to encode the base cuboid.
+     */
+    private static class EncodeBaseCuboidMapPartitionFunction extends 
RichMapPartitionFunction<String[], Tuple2<ByteArray, Object[]>> {
+
+        private BaseCuboidBuilder baseCuboidBuilder = null;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+
+        public EncodeBaseCuboidMapPartitionFunction(String cubeName, String 
segmentId, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                CubeDesc cubeDesc = cubeInstance.getDescriptor();
+                CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
+                CubeJoinedFlatTableEnrich interDesc = new 
CubeJoinedFlatTableEnrich(
+                        EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
+                long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+                Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, 
baseCuboidId);
+                baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, 
cubeSegment, interDesc,
+                        AbstractRowKeyEncoder.createInstance(cubeSegment, 
baseCuboid),
+                        MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
+            }
+        }
+
+        @Override
+        public void mapPartition(Iterable<String[]> rowArrays, 
Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            for (String[] rowArray : rowArrays) {
+                baseCuboidBuilder.resetAggrs();
+                byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+                Object[] result = 
baseCuboidBuilder.buildValueObjects(rowArray);
+                collector.collect(new Tuple2<>(new ByteArray(rowKey), result));
+            }
+        }
+    }
+
     private static class BaseCuboidReduceGroupFunction extends 
RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, 
Object[]>> {
 
         protected String cubeName;
@@ -469,6 +517,65 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
     }
 
     /**
+     * A map partition function which extracts a cuboid's children cuboids and 
emit them to the down stream.
+     */
+    private static class CuboidMapPartitionFunction extends 
RichMapPartitionFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, 
Object[]>> {
+
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private CubeSegment cubeSegment;
+        private CubeDesc cubeDesc;
+        private NDCuboidBuilder ndCuboidBuilder;
+        private RowKeySplitter rowKeySplitter;
+        private SerializableConfiguration conf;
+
+        public CuboidMapPartitionFunction(String cubeName, String segmentId, 
String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+                this.cubeDesc = cubeInstance.getDescriptor();
+                this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new 
RowKeyEncoderProvider(cubeSegment));
+                this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+            }
+        }
+
+        @Override
+        public void mapPartition(Iterable<Tuple2<ByteArray, Object[]>> 
iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            for (Tuple2<ByteArray, Object[]> item : iterable) {
+                byte[] key = item.f0.array();
+                long cuboidId = rowKeySplitter.parseCuboid(key);
+                final List<Long> myChildren = 
cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
+
+                // if still empty or null
+                if (myChildren == null || myChildren.size() == 0) {
+                    return;
+                }
+                rowKeySplitter.split(key);
+                final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, 
cuboidId);
+
+                for (Long child : myChildren) {
+                    Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, 
child);
+                    ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, 
childCuboid,
+                            rowKeySplitter.getSplitBuffers());
+
+                    collector.collect(new Tuple2<>(result, item.f1));
+                }
+            }
+        }
+    }
+
+    /**
      * A flatmap function which extracts a cuboid's children cuboids and emit 
them to the down stream.
      */
     private static class CuboidFlatMapFunction extends 
RichFlatMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {

Reply via email to