This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push: new b6cf305 KYLIN-4090 Use mapPartition operator to optimize build cube step b6cf305 is described below commit b6cf305c7eeee1ed8c7c16004b89323115a2c5fb Author: yanghua <yanghua1...@gmail.com> AuthorDate: Wed Jul 17 19:55:37 2019 +0800 KYLIN-4090 Use mapPartition operator to optimize build cube step --- .../kylin/engine/flink/FlinkCubingByLayer.java | 115 ++++++++++++++++++++- 1 file changed, 111 insertions(+), 4 deletions(-) diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java index 9e6f86f..b39aa1e 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java @@ -23,6 +23,7 @@ import org.apache.commons.cli.Options; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -173,8 +174,8 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa DataSet<String[]> hiveDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job); - DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet = hiveDataSet.map( - new EncodeBaseCuboidMapFunction(cubeName, segmentId, metaUrl, sConf)); + DataSet<Tuple2<ByteArray, Object[]>> encodedBaseDataSet = hiveDataSet.mapPartition( + new EncodeBaseCuboidMapPartitionFunction(cubeName, segmentId, metaUrl, sConf)); Long totalCount = 0L; if (envConfig.isFlinkSanityCheckEnabled()) { @@ -197,10 +198,10 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig); - CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf); + CuboidMapPartitionFunction mapPartitionFunction = new CuboidMapPartitionFunction(cubeName, segmentId, metaUrl, sConf); for (level = 1; level <= totalLevels; level++) { - allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduceGroup(reducerFunction); + allDataSets[level] = allDataSets[level - 1].mapPartition(mapPartitionFunction).groupBy(0).reduceGroup(reducerFunction); if (envConfig.isFlinkSanityCheckEnabled()) { sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex); } @@ -322,6 +323,53 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa } } + /** + * A map partition function used to encode the base cuboid. + */ + private static class EncodeBaseCuboidMapPartitionFunction extends RichMapPartitionFunction<String[], Tuple2<ByteArray, Object[]>> { + + private BaseCuboidBuilder baseCuboidBuilder = null; + private String cubeName; + private String segmentId; + private String metaUrl; + private SerializableConfiguration conf; + + public EncodeBaseCuboidMapPartitionFunction(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.metaUrl = metaUrl; + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(kConfig)) { + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + CubeDesc cubeDesc = cubeInstance.getDescriptor(); + CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + CubeJoinedFlatTableEnrich interDesc = new CubeJoinedFlatTableEnrich( + EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, baseCuboidId); + baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc, + AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), + MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + } + } + + @Override + public void mapPartition(Iterable<String[]> rowArrays, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception { + for (String[] rowArray : rowArrays) { + baseCuboidBuilder.resetAggrs(); + byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); + Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); + collector.collect(new Tuple2<>(new ByteArray(rowKey), result)); + } + } + } + private static class BaseCuboidReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> { protected String cubeName; @@ -469,6 +517,65 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa } /** + * A map partition function which extracts a cuboid's children cuboids and emit them to the down stream. + */ + private static class CuboidMapPartitionFunction extends RichMapPartitionFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> { + + private String cubeName; + private String segmentId; + private String metaUrl; + private CubeSegment cubeSegment; + private CubeDesc cubeDesc; + private NDCuboidBuilder ndCuboidBuilder; + private RowKeySplitter rowKeySplitter; + private SerializableConfiguration conf; + + public CuboidMapPartitionFunction(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.metaUrl = metaUrl; + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(kConfig)) { + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + this.cubeSegment = cubeInstance.getSegmentById(segmentId); + this.cubeDesc = cubeInstance.getDescriptor(); + this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment)); + this.rowKeySplitter = new RowKeySplitter(cubeSegment); + } + } + + @Override + public void mapPartition(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception { + for (Tuple2<ByteArray, Object[]> item : iterable) { + byte[] key = item.f0.array(); + long cuboidId = rowKeySplitter.parseCuboid(key); + final List<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId); + + // if still empty or null + if (myChildren == null || myChildren.size() == 0) { + return; + } + rowKeySplitter.split(key); + final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId); + + for (Long child : myChildren) { + Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child); + ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid, + rowKeySplitter.getSplitBuffers()); + + collector.collect(new Tuple2<>(result, item.f1)); + } + } + } + } + + /** * A flatmap function which extracts a cuboid's children cuboids and emit them to the down stream. */ private static class CuboidFlatMapFunction extends RichFlatMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {