This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e220b9a20c4eae09c8c240c066b730595eb03693 Author: Ma,Gang <ga...@ebay.com> AuthorDate: Fri May 22 10:28:03 2020 +0800 KYLIN-4508 Add more unit tests for MR module Signed-off-by: shaofengshi <shaofeng...@apache.org> --- engine-mr/pom.xml | 7 + .../kylin/engine/mr/common/JobInfoConverter.java | 20 ++- .../kylin/engine/mr/ByteArrayWritableTest.java | 91 ++++++++++ .../engine/mr/common/CubeStatsReaderTest.java | 45 +++++ .../engine/mr/common/CubeStatsWriterTest.java | 105 +++++++++++ .../engine/mr/common/JobInfoConverterTest.java | 77 +++++++- .../CalculateStatsFromBaseCuboidMapperTest.java | 131 ++++++++++++++ .../mr/steps/FactDistinctColumnsReducerTest.java | 185 ++++++++++++++++++- .../engine/mr/steps/InMemCuboidMapperTest.java | 197 +++++++++++++++++++++ .../engine/mr/streaming/RowRecordReaderTest.java | 150 ++++++++++++++++ .../198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq | Bin 0 -> 1253 bytes kylin-it/pom.xml | 6 + .../mr/steps/FactDistinctColumnsMapperTest.java | 98 ++++++++++ pom.xml | 2 +- 14 files changed, 1102 insertions(+), 12 deletions(-) diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml index 7a0bc4a..fffd098 100644 --- a/engine-mr/pom.xml +++ b/engine-mr/pom.xml @@ -119,6 +119,13 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4-rule-agent</artifactId> + <!-- power mock should be compatiable with mrunit--> + <version>1.5.4</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.kylin</groupId> <artifactId>kylin-stream-core</artifactId> <type>test-jar</type> diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index 9c19065..ef16399 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@ -61,6 +61,7 @@ public class JobInfoConverter { } } + public static JobInstance parseToJobInstance(CubingJob job, Map<String, Output> outputs) { if (job == null) { logger.warn("job is null."); @@ -73,15 +74,24 @@ public class JobInfoConverter { return null; } - CubingJob cubeJob = (CubingJob) job; - CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()) - .getCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + CubingJob cubeJob = job; + String cubeName = CubingExecutableUtil.getCubeName(cubeJob.getParams()); + String displayCubeName = cubeName; + try { + CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + if (cube != null) { + cubeName = cube.getName(); + displayCubeName = cube.getDisplayName(); + } + } catch (Exception e) { + logger.warn("Fail to get cube instance for {}.", cubeName); + } final JobInstance result = new JobInstance(); result.setName(job.getName()); result.setProjectName(cubeJob.getProjectName()); - result.setRelatedCube(cube != null ? cube.getName() : CubingExecutableUtil.getCubeName(cubeJob.getParams())); - result.setDisplayCubeName(cube != null ? cube.getDisplayName() : CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedCube(cubeName); + result.setDisplayCubeName(displayCubeName); result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); result.setRelatedSegmentName(CubingExecutableUtil.getSegmentName(cubeJob.getParams())); result.setLastModified(output.getLastModified()); diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java new file mode 100644 index 0000000..2630b36 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class ByteArrayWritableTest { + + @Test + public void basicTest() { + ByteArrayWritable byteArrayWritable = new ByteArrayWritable(5); + Assert.assertEquals(5, byteArrayWritable.length()); + Assert.assertNotEquals(0, byteArrayWritable.hashCode()); + byteArrayWritable.set(new byte[] { 0x1, 0x2, 0x3 }); + + Assert.assertArrayEquals(new byte[] { 1, 2, 3 }, byteArrayWritable.array()); + Assert.assertEquals(0, byteArrayWritable.offset()); + Assert.assertEquals(3, byteArrayWritable.length()); + + Assert.assertEquals("01 02 03", byteArrayWritable.toString()); + + ByteArrayWritable byteArrayWritableNull = new ByteArrayWritable(null); + Assert.assertEquals(0, byteArrayWritableNull.hashCode()); + } + + @Test + public void testCompare() { + ByteArrayWritable b1 = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 }); + ByteArrayWritable b2 = new ByteArrayWritable(); + + Assert.assertFalse(b1.equals(1)); + + b2.set(new byte[] { 0x1, 0x2, 0x3 }); + + Assert.assertTrue(b1.equals(b2)); + Assert.assertTrue(b1.equals(new byte[] { 1, 2, 3 })); + } + + @Test() + public void testIO() throws IOException { + ByteArrayWritable byteArrayWritable = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 }); + ByteArrayWritable byteArrayWritableNull = new ByteArrayWritable(null); + ByteArrayWritable byteArrayWritableSlice = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 }, 1, 2); + + byteArrayWritable.asBuffer(); + byteArrayWritableNull.asBuffer(); + byteArrayWritableSlice.asBuffer(); + + OutputStream outputStream = new OutputStream() { + @Override + public void write(int b) throws IOException { + } + }; + DataOutput output = new DataOutputStream(outputStream); + byteArrayWritable.write(output); + + InputStream inputStream = new InputStream() { + @Override + public int read() throws IOException { + return 0; + } + }; + DataInput input = new DataInputStream(inputStream); + byteArrayWritableNull.readFields(input); + } +} \ No newline at end of file diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java new file mode 100644 index 0000000..9eaafab --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class CubeStatsReaderTest extends LocalFileMetadataTestCase { + @Before + public void setup() throws Exception { + createTestMetadata(); + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + } + + @Test + public void testReadAndPrint() throws IOException { + String[] args = new String[] { "test_kylin_cube_with_slr_1_new_segment" }; + CubeStatsReader.main(args); + } + +} diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java new file mode 100644 index 0000000..4e0f100 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class CubeStatsWriterTest extends LocalFileMetadataTestCase { + private CubeInstance cube; + private CubeSegment cubeSegment; + + private final String segmentId = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"; + + @Before + public void setup() throws Exception { + File tmpFolder = getTempFolder(); + FileUtils.deleteDirectory(tmpFolder); + tmpFolder.mkdirs(); + createTestMetadata(); + cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getCube("test_kylin_cube_with_slr_1_new_segment"); + cubeSegment = cube.getSegmentById(segmentId); + } + + @After + public void after() throws Exception { + File tmpFolder = getTempFolder(); + FileUtils.deleteDirectory(tmpFolder); + cleanupTestMetadata(); + } + + @Test + public void testWrite() throws IOException { + Configuration conf = HadoopUtil.getCurrentConfiguration(); + conf.set("fs.defaultFS", "file:///"); + conf.set("mapreduce.framework.name", "local"); + conf.set("mapreduce.application.framework.path", ""); + conf.set("fs.file.impl.disable.cache", "true"); + + final Path outputPath = new Path(getTmpFolderPath(), segmentId); + + System.out.println(outputPath); + Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + + Set<Long> allCuboids = cube.getDescriptor().getAllCuboids(); + for (Long cuboid : allCuboids) { + cuboidHLLMap.put(cuboid, createMockHLLCounter()); + } + CubeStatsWriter.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100); + assertTrue(new File(outputPath.toString(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME).exists()); + } + + private HLLCounter createMockHLLCounter() { + HLLCounter one = new HLLCounter(14); + for (int i = 0; i < 100; i++) { + one.clear(); + one.add(i); + } + return one; + } + + private File getTempFolder() { + return new File(getTmpFolderPath()); + } + + private String getTmpFolderPath() { + return "_tmp_cube_statistics"; + } +} diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java index ffc223d..a54e112 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java @@ -25,15 +25,19 @@ import static org.junit.Assert.assertTrue; import java.util.Map; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.CheckpointExecutable; +import org.apache.kylin.job.execution.DefaultOutput; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.ExecuteResult; @@ -59,6 +63,67 @@ public class JobInfoConverterTest { assertEquals(step.getStatus(), JobStepStatusEnum.FINISHED); } + @Test + public void testParseToJobInstance4CuboidJob() { + TestJob task = new TestJob(); + String jobId = UUID.randomUUID().toString(); + String cubeName = "cube1"; + task.setId(jobId); + task.setParam(CubingExecutableUtil.CUBE_NAME, cubeName); + Map<String, Output> outPutMap = Maps.newHashMap(); + DefaultOutput executeOutput = new DefaultOutput(); + executeOutput.setState(ExecutableState.READY); + Map<String, String> extraMap = Maps.newHashMap(); + executeOutput.setExtra(extraMap); + outPutMap.put(jobId, executeOutput); + + JobInstance instance3 = JobInfoConverter.parseToJobInstanceQuietly(task, outPutMap); + // no exception thrown is expected + assertEquals(jobId, instance3.getId()); + assertEquals(CubeBuildTypeEnum.BUILD, instance3.getType()); + assertEquals(cubeName, instance3.getRelatedCube()); + assertEquals(JobStatusEnum.PENDING, instance3.getStatus()); + } + + @Test + public void testParseToJobInstance4CheckpointJob() { + Test2Job task = new Test2Job(); + String jobId = UUID.randomUUID().toString(); + String cubeName = "cube1"; + task.setId(jobId); + task.setParam(CubingExecutableUtil.CUBE_NAME, cubeName); + Map<String, Output> outPutMap = Maps.newHashMap(); + DefaultOutput executeOutput = new DefaultOutput(); + executeOutput.setState(ExecutableState.READY); + Map<String, String> extraMap = Maps.newHashMap(); + executeOutput.setExtra(extraMap); + outPutMap.put(jobId, executeOutput); + + JobInstance instance3 = JobInfoConverter.parseToJobInstanceQuietly(task, outPutMap); + // no exception thrown is expected + assertEquals(jobId, instance3.getId()); + assertEquals(CubeBuildTypeEnum.CHECKPOINT, instance3.getType()); + assertEquals(cubeName, instance3.getRelatedCube()); + assertEquals(JobStatusEnum.PENDING, instance3.getStatus()); + } + + @Test + public void testStatusConvert() { + assertEquals(JobStatusEnum.PENDING, JobInfoConverter.parseToJobStatus(ExecutableState.READY)); + assertEquals(JobStatusEnum.RUNNING, JobInfoConverter.parseToJobStatus(ExecutableState.RUNNING)); + assertEquals(JobStatusEnum.DISCARDED, JobInfoConverter.parseToJobStatus(ExecutableState.DISCARDED)); + assertEquals(JobStatusEnum.ERROR, JobInfoConverter.parseToJobStatus(ExecutableState.ERROR)); + assertEquals(JobStatusEnum.STOPPED, JobInfoConverter.parseToJobStatus(ExecutableState.STOPPED)); + assertEquals(JobStatusEnum.FINISHED, JobInfoConverter.parseToJobStatus(ExecutableState.SUCCEED)); + + assertEquals(JobStepStatusEnum.PENDING, JobInfoConverter.parseToJobStepStatus(ExecutableState.READY)); + assertEquals(JobStepStatusEnum.RUNNING, JobInfoConverter.parseToJobStepStatus(ExecutableState.RUNNING)); + assertEquals(JobStepStatusEnum.DISCARDED, JobInfoConverter.parseToJobStepStatus(ExecutableState.DISCARDED)); + assertEquals(JobStepStatusEnum.ERROR, JobInfoConverter.parseToJobStepStatus(ExecutableState.ERROR)); + assertEquals(JobStepStatusEnum.STOPPED, JobInfoConverter.parseToJobStepStatus(ExecutableState.STOPPED)); + assertEquals(JobStepStatusEnum.FINISHED, JobInfoConverter.parseToJobStepStatus(ExecutableState.SUCCEED)); + } + public static class TestJob extends CubingJob { public TestJob() { super(); @@ -70,6 +135,17 @@ public class JobInfoConverterTest { } } + public static class Test2Job extends CheckpointExecutable { + public Test2Job() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + return new ExecuteResult(ExecuteResult.State.SUCCEED, ""); + } + } + public static class TestOutput implements Output { @Override @@ -229,5 +305,4 @@ public class JobInfoConverterTest { assertNull(jobInstance); } - } diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java new file mode 100644 index 0000000..a1416a8 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.mrunit.types.Pair; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.primitives.Bytes; + +public class CalculateStatsFromBaseCuboidMapperTest extends LocalFileMetadataTestCase { + private String cubeName; + private CubeInstance cube; + private CubeDesc cubeDesc; + private MapDriver<Text, Text, Text, Text> mapDriver; + + @Before + public void setup() throws Exception { + createTestMetadata(); + FileUtils.deleteDirectory(new File("./meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta")); + + cubeName = "test_kylin_cube_with_slr_1_new_segment"; + cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + CalculateStatsFromBaseCuboidMapper calStatsFromBasicCuboidMapper = new CalculateStatsFromBaseCuboidMapper(); + mapDriver = MapDriver.newMapDriver(calStatsFromBasicCuboidMapper); + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + } + + @Test + public void testMapper() throws Exception { + mapperTest(); + } + + @Test + public void testMapperWithOldHLL() throws Exception { + cubeDesc.setVersion("1.5.2"); + mapperTest(); + } + + private void mapperTest() throws Exception { + setConfiguration(); + Text key1 = new Text(); + byte[] shard = new byte[] { 0, 0 }; + byte[] cuboidId = Cuboid.getBaseCuboid(cubeDesc).getBytes(); + byte[] col1 = new byte[] { 0, 0, 0, 1 }; + byte[] col2 = new byte[] { 0, 6, 0 }; + byte[] col3 = new byte[] { 1 }; + byte[] col4 = new byte[] { 1 }; + byte[] col5 = new byte[] { 1 }; + byte[] col6 = new byte[] { 1 }; + byte[] col7 = new byte[] { 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0 }; + byte[] col8 = new byte[] { 1, 0 }; + byte[] col9 = new byte[] { 1 }; + key1.set(Bytes.concat(shard, cuboidId, col1, col2, col3, col4, col5, col6, col7, col8, col9)); + Text val1 = new Text(); + + mapDriver.setInput(key1, val1); + List<Pair<Text, Text>> result = mapDriver.run(); + Set<Long> cuboidIdSet = cube.getCuboidsByMode(CuboidModeEnum.CURRENT); + assertEquals(cuboidIdSet.size(), result.size()); + Long[] cuboids = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]); + Arrays.sort(cuboids); + + List<Long> resultCuboidList = Lists.transform(result, new Function<Pair<Text, Text>, Long>() { + @Nullable + @Override + public Long apply(@Nullable Pair<Text, Text> input) { + byte[] bytes = input.getFirst().getBytes(); + return org.apache.kylin.common.util.Bytes.toLong(bytes); + } + }); + Long[] resultCuboids = resultCuboidList.toArray(new Long[resultCuboidList.size()]); + Arrays.sort(resultCuboids); + assertArrayEquals(cuboids, resultCuboids); + } + + private void setConfiguration() throws Exception { + Configuration configuration = mapDriver.getConfiguration(); + configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100"); + configuration.set(BatchConstants.CFG_CUBE_NAME, cubeName); + configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"); + } + +} diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java index 7348ce6..9f684c9 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java @@ -14,35 +14,92 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package org.apache.kylin.engine.mr.steps; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayPrimitiveWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.apache.hadoop.mrunit.types.Pair; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.shaded.com.google.common.collect.ImmutableList; + +public class FactDistinctColumnsReducerTest extends LocalFileMetadataTestCase { + private String cubeName; + private CubeInstance cube; + private CubeDesc cubeDesc; + private ReduceDriver<SelfDefineSortableKey, Text, NullWritable, Text> reduceDriver; + + @Before + public void setup() throws Exception { + createTestMetadata(); + FileUtils.deleteDirectory(new File("./meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta")); + + cubeName = "test_kylin_cube_with_slr_1_new_segment"; + cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + FactDistinctColumnsReducer factDistinctColumnsReducer = new FactDistinctColumnsReducer(); + reduceDriver = ReduceDriver.newReduceDriver(factDistinctColumnsReducer); + } -/** - */ -public class FactDistinctColumnsReducerTest { + @After + public void after() throws Exception { + FileUtils.deleteDirectory(new File("./meta")); + FileUtils.deleteQuietly(new File(FileOutputCommitter.PENDING_DIR_NAME)); + cleanupTestMetadata(); + } @Test public void testWriteCuboidStatistics() throws IOException { final Configuration conf = HadoopUtil.getCurrentConfiguration(); File tmp = File.createTempFile("cuboidstatistics", ""); - final Path outputPath = new Path(tmp.getParent().toString() + File.separator + RandomUtil.randomUUID().toString()); + final Path outputPath = new Path(tmp.getParent() + File.separator + RandomUtil.randomUUID().toString()); if (!FileSystem.getLocal(conf).exists(outputPath)) { // FileSystem.getLocal(conf).create(outputPath); } @@ -53,4 +110,122 @@ public class FactDistinctColumnsReducerTest { FileSystem.getLocal(conf).delete(outputPath, true); } + + @Test + public void testReducerStatistics() throws IOException { + setConfigurations(); + setMultipleOutputs(BatchConstants.CFG_OUTPUT_STATISTICS, reduceDriver.getConfiguration(), + SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class); + setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class, + NullWritable.class, LongWritable.class); + + // override the task id + int dimColsSize = cubeDesc.getRowkey().getRowKeyColumns().length; + int uhcSize = cubeDesc.getAllUHCColumns().size(); + final int targetTaskId = (dimColsSize - uhcSize) + uhcSize * cubeDesc.getConfig().getUHCReducerCount(); + + setContextTaskId(targetTaskId); + ByteBuffer tmpBuf = ByteBuffer.allocate(4096); + tmpBuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte + tmpBuf.putLong(100); + Text outputKey1 = new Text(); + outputKey1.set(tmpBuf.array(), 0, tmpBuf.position()); + SelfDefineSortableKey key1 = new SelfDefineSortableKey(); + key1.init(outputKey1, (byte) 0); + + HLLCounter hll = createMockHLLCounter(); + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + Text value1 = new Text(); + value1.set(hllBuf.array(), 0, hllBuf.position()); + + reduceDriver.setInput(key1, ImmutableList.of(value1)); + + List<Pair<NullWritable, Text>> result = reduceDriver.run(); + assertEquals(0, result.size()); // the reducer output statistics info to a sequence file. + } + + @Test + public void testReducerNormalDimDictInReducer() throws IOException { + testNormalDim(); + } + + @Test + public void testReducerNormalDim() throws IOException { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata(); + kylinConfig.setProperty("kylin.engine.mr.build-dict-in-reducer", "false"); + testNormalDim(); + } + + private void setContextTaskId(final int taskId) { + Context context = reduceDriver.getContext(); + when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() { + @Override + public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable { + return TaskAttemptID.forName("attempt__0000_r_" + taskId + "_0"); + } + }); + } + + private void setConfigurations() { + Configuration configuration = reduceDriver.getConfiguration(); + configuration.set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_1_new_segment"); + configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"); + configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100"); + configuration.set(FileOutputFormat.OUTDIR, "."); + } + + // copy from MultpleOutputs for test + private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs"; + private static final String MO_PREFIX = "mapreduce.multipleoutputs.namedOutput."; + private static final String FORMAT = ".format"; + private static final String KEY = ".key"; + private static final String VALUE = ".value"; + + private void setMultipleOutputs(String namedOutput, Configuration conf, + Class<? extends OutputFormat> outputFormatClass, Class<?> keyClass, Class<?> valueClass) + throws IOException { + conf.set(MULTIPLE_OUTPUTS, conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); + conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, OutputFormat.class); + conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); + conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); + } + + private HLLCounter createMockHLLCounter() { + HLLCounter hllc = new HLLCounter(14); + HLLCounter one = new HLLCounter(14); + for (int i = 0; i < 1000; i++) { + one.clear(); + one.add(i); + hllc.merge(one); + } + return hllc; + } + + private void testNormalDim() throws IOException { + setConfigurations(); + setMultipleOutputs(BatchConstants.CFG_OUTPUT_COLUMN, reduceDriver.getConfiguration(), + SequenceFileOutputFormat.class, NullWritable.class, Text.class); + setMultipleOutputs(BatchConstants.CFG_OUTPUT_DICT, reduceDriver.getConfiguration(), + SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class); + setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class, + NullWritable.class, LongWritable.class); + + int nDimReducers = cubeDesc.getRowkey().getRowKeyColumns().length; + setContextTaskId(nDimReducers - 1); + + ByteBuffer tmpBuf = ByteBuffer.allocate(4096); + String val = "100"; + tmpBuf.put(Bytes.toBytes(val)); + Text outputKey1 = new Text(); + outputKey1.set(tmpBuf.array(), 0, tmpBuf.position()); + SelfDefineSortableKey key1 = new SelfDefineSortableKey(); + key1.init(outputKey1, (byte) 0); + + reduceDriver.setInput(key1, ImmutableList.of(new Text())); + List<Pair<NullWritable, Text>> result = reduceDriver.run(); + assertEquals(0, result.size()); + } + } diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java new file mode 100644 index 0000000..612e9ae --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import static org.junit.Assert.assertArrayEquals; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; +import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData; +import org.apache.kylin.engine.mr.ByteArrayWritable; +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; +import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.rule.PowerMockRule; +import org.powermock.reflect.Whitebox; + +@PrepareForTest({ MRUtil.class, CuboidSchedulerUtil.class, InMemCuboidMapper.class }) +public class InMemCuboidMapperTest extends LocalFileMetadataTestCase { + @Rule + public PowerMockRule rule = new PowerMockRule(); + + private String cubeName; + private CubeInstance cube; + private InMemCuboidMapper<NullWritable> inMemCuboidMapper; + private MapDriver<NullWritable, Object, ByteArrayWritable, ByteArrayWritable> mapDriver; + + @Before + public void setup() throws Exception { + createTestMetadata(); + FileUtils.deleteDirectory(new File("./meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta")); + + cubeName = "test_kylin_cube_with_slr_1_new_segment"; + cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + inMemCuboidMapper = new InMemCuboidMapper<>(); + mapDriver = MapDriver.newMapDriver(inMemCuboidMapper); + + PowerMockito.stub(PowerMockito.method(CuboidSchedulerUtil.class, "getCuboidSchedulerByMode", CubeSegment.class, + String.class)).toReturn(cube.getCuboidScheduler()); + IMRBatchCubingInputSide mockInputSide = createMockInputSide(); + PowerMockito.stub(PowerMockito.method(MRUtil.class, "getBatchCubingInputSide")).toReturn(mockInputSide); + + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + FileUtils.deleteDirectory(new File("./meta")); + } + + @Test + public void testMapper() throws Exception { + TestHandler testHandler = new TestHandler(); + setConfigurationAndMock(testHandler); + + mapDriver.setInput(NullWritable.get(), NullWritable.get()); + mapDriver.run(); + } + + @Test + public void testMapperWithCutRow() throws Exception { + Whitebox.setInternalState(inMemCuboidMapper, "splitRowThreshold", 1); + Whitebox.setInternalState(inMemCuboidMapper, "unitRows", 1); + TestHandler testHandler = new TestWithCutRowHandler(); + setConfigurationAndMock(testHandler); + + mapDriver.setInput(NullWritable.get(), NullWritable.get()); + mapDriver.run(); + } + + private void setConfigurationAndMock(TestHandler testHandler) throws Exception { + Configuration configuration = mapDriver.getConfiguration(); + configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100"); + configuration.set(BatchConstants.CFG_CUBE_NAME, cubeName); + configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"); + + DoggedCubeBuilder mockDoggedCubeBuilder = createMockDoggedCubeBuilder(testHandler); + PowerMockito.whenNew(DoggedCubeBuilder.class).withAnyArguments().thenReturn(mockDoggedCubeBuilder); + } + + private IMRBatchCubingInputSide createMockInputSide() throws Exception { + IMRInput.IMRTableInputFormat mockInputFormat = createMockInputFormat(); + IMRBatchCubingInputSide mockInputSide = PowerMockito.mock(IMRBatchCubingInputSide.class); + PowerMockito.when(mockInputSide.getFlatTableInputFormat()).thenReturn(mockInputFormat); + return mockInputSide; + } + + private IMRTableInputFormat createMockInputFormat() throws Exception { + String[] row = getMockInputRow(); + Collection<String[]> rows = new ArrayList<>(); + rows.add(row); + IMRTableInputFormat mockFormat = PowerMockito.mock(IMRTableInputFormat.class); + PowerMockito.when(mockFormat, "parseMapperInput", Mockito.any()).thenReturn(rows); + return mockFormat; + } + + private static String[] getMockInputRow() { + return new String[] { "2012-01-01" }; + } + + private DoggedCubeBuilder createMockDoggedCubeBuilder(final TestHandler hanlder) throws Exception { + DoggedCubeBuilder mockDoggedCubeBuilder = PowerMockito.mock(DoggedCubeBuilder.class); + PowerMockito.when(mockDoggedCubeBuilder, "buildAsRunnable", Mockito.any(BlockingQueue.class), Mockito.any(), + Mockito.any()).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + hanlder.setInputQueue((BlockingQueue) args[0]); + return hanlder; + } + }); + return mockDoggedCubeBuilder; + } + + private static class TestHandler implements Runnable { + BlockingQueue queue; + + public void setInputQueue(BlockingQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + try { + consumeRows(); + + while (true) { + String[] newRow = (String[]) queue.take(); + if (InputConverterUnitForRawData.END_ROW == newRow) { + break; + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + protected void consumeRows() throws InterruptedException { + String[] row = (String[]) queue.take(); + assertArrayEquals(getMockInputRow(), row); + } + + } + + private static class TestWithCutRowHandler extends TestHandler { + + protected void consumeRows() throws InterruptedException { + String[] row = (String[]) queue.take(); + assertArrayEquals(getMockInputRow(), row); + + row = (String[]) queue.take(); + assertArrayEquals(InputConverterUnitForRawData.CUT_ROW, row); + } + + } +} diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java new file mode 100644 index 0000000..acc5f6d --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.streaming; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.stream.core.model.StreamingMessage; +import org.apache.kylin.stream.core.query.StreamingQueryProfile; +import org.apache.kylin.stream.core.storage.columnar.ColumnarMemoryStorePersister; +import org.apache.kylin.stream.core.storage.columnar.DataSegmentFragment; +import org.apache.kylin.stream.core.storage.columnar.FragmentData; +import org.apache.kylin.stream.core.storage.columnar.FragmentFileSearcher; +import org.apache.kylin.stream.core.storage.columnar.FragmentId; +import org.apache.kylin.stream.core.storage.columnar.ParsedStreamingCubeInfo; +import org.apache.kylin.stream.core.storage.columnar.SegmentMemoryStore; +import org.apache.kylin.stream.core.storage.columnar.StreamingDataSimulator; +import org.apache.log4j.PropertyConfigurator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class RowRecordReaderTest extends LocalFileMetadataTestCase { + private static final String cubeName = "test_streaming_v2_cube"; + + private String baseStorePath; + private CubeInstance cubeInstance; + private String segmentName; + private ParsedStreamingCubeInfo parsedStreamingCubeInfo; + private DataSegmentFragment fragment; + private FragmentFileSearcher fragmentFileSearcher; + private CubeDesc cubeDesc; + private int eventCnt = 50000; + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + this.baseStorePath = KylinConfig.getInstanceFromEnv().getStreamingIndexPath(); + this.cubeInstance = CubeManager.getInstance(getTestConfig()).reloadCube(cubeName); + this.cubeDesc = cubeInstance.getDescriptor(); + this.segmentName = "20171018100000_20171018110000"; + this.parsedStreamingCubeInfo = new ParsedStreamingCubeInfo(cubeInstance); + this.fragment = new DataSegmentFragment(baseStorePath, cubeName, segmentName, new FragmentId(0)); + PropertyConfigurator.configure("../build/conf/kylin-tools-log4j.properties"); + prepareData(); + fragmentFileSearcher = new FragmentFileSearcher(fragment, new FragmentData(fragment.getMetaInfo(), + fragment.getDataFile())); + StreamingQueryProfile.set(new StreamingQueryProfile("test-query-id", System.currentTimeMillis())); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + cleanupData(); + } + + @Test + public void testIterator() throws Exception { + Path path = new Path(fragment.getDataFile().getParentFile().getAbsolutePath()); + FileSystem fs = FileSystem.getLocal(new Configuration()); + RowRecordReader rowRecordReader = new RowRecordReader(cubeDesc, path, fs); + List<MeasureDesc> measures = cubeDesc.getMeasures(); + List<DataTypeSerializer> dataTypeSerializers = Lists.newArrayListWithCapacity(measures.size()); + for (MeasureDesc measure : measures) { + dataTypeSerializers.add(DataTypeSerializer.create(measure.getFunction().getReturnDataType())); + } + int rowNum = 0; + while (rowRecordReader.hasNextRow()) { + RowRecord record = rowRecordReader.nextRow(); + if (rowNum < 10) { + String[] dimensions = record.getDimensions(); + byte[][] metrics = record.getMetrics(); + Object[] metricValues = new Object[metrics.length]; + for (int i = 0; i < metrics.length; i++) { + metricValues[i] = dataTypeSerializers.get(i).deserialize(ByteBuffer.wrap(metrics[i])); + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < dimensions.length; i++) { + sb.append(dimensions[i]); + sb.append(","); + } + for (int i = 0; i < metricValues.length; i++) { + sb.append(metricValues[i].toString()); + sb.append(","); + } + System.out.println(sb.toString()); + } + rowNum++; + } + assertEquals(eventCnt, rowNum); + } + + protected void prepareData() { + // build additional cuboids + KylinConfigExt configExt = (KylinConfigExt) cubeInstance.getDescriptor().getConfig(); + configExt.getExtendedOverrides().put("kylin.stream.build.additional.cuboids", "true"); + + ColumnarMemoryStorePersister memStorePersister = new ColumnarMemoryStorePersister(parsedStreamingCubeInfo, + segmentName); + StreamingDataSimulator simulator = new StreamingDataSimulator( + StreamingDataSimulator.getDefaultCardinalityMap(), 100000); + Iterator<StreamingMessage> streamingMessages = simulator.simulate(eventCnt, System.currentTimeMillis()); + SegmentMemoryStore memoryStore = new SegmentMemoryStore(new ParsedStreamingCubeInfo(cubeInstance), segmentName); + while (streamingMessages.hasNext()) { + memoryStore.index(streamingMessages.next()); + } + + memStorePersister.persist(memoryStore, fragment); + } + + private void cleanupData() throws IOException { + FileUtils.deleteQuietly(new File(baseStorePath)); + } + +} diff --git a/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq b/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq new file mode 100644 index 0000000..02bcf8c Binary files /dev/null and b/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq differ diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index 99873fd..2b82720 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -176,6 +176,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.mrunit</groupId> + <artifactId>mrunit</artifactId> + <classifier>hadoop2</classifier> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-annotations</artifactId> <scope>provided</scope> diff --git a/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java b/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java new file mode 100644 index 0000000..8d9fd7e --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.mrunit.types.Pair; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class FactDistinctColumnsMapperTest extends LocalFileMetadataTestCase { + private String cubeName; + private CubeInstance cube; + private CubeDesc cubeDesc; + private MapDriver<LongWritable, Object, SelfDefineSortableKey, Text> mapDriver; + + @Before + public void setup() throws Exception { + createTestMetadata(); + FileUtils.deleteDirectory(new File("./meta")); + FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta")); + + cubeName = "test_kylin_cube_with_slr_1_new_segment"; + cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + FactDistinctColumnsMapper<LongWritable> factDistinctColumnsMapper = new FactDistinctColumnsMapper<>(); + mapDriver = MapDriver.newMapDriver(factDistinctColumnsMapper); + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + } + + @Test + public void testMapper() throws IOException { + Configuration configuration = mapDriver.getConfiguration(); + configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100"); + configuration.set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_1_new_segment"); + configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"); + HCatRecord value1 = new DefaultHCatRecord(11); + value1.set(0, "2012-08-16"); + value1.set(1, "48027"); + value1.set(2, "0"); + value1.set(3, "Home & Garden"); + value1.set(4, "Cheese & Crackers"); + value1.set(5, "Cheese & Crackers"); + value1.set(6, "48027"); + value1.set(7, "16"); + value1.set(8, "10000010"); + value1.set(9, "204.28"); + value1.set(10, "5"); + mapDriver.addInput(new LongWritable(0), value1); + + List<Pair<SelfDefineSortableKey, Text>> result = mapDriver.run(); + int colsNeedDictSize = cubeDesc.getAllColumnsNeedDictionaryBuilt().size(); + int cuboidsCnt = cubeDesc.getAllCuboids().size(); + + assertEquals( + colsNeedDictSize + (cubeDesc.getRowkey().getRowKeyColumns().length - colsNeedDictSize) * 2 + cuboidsCnt, + result.size()); + } + +} diff --git a/pom.xml b/pom.xml index b3e80b5..bd916b0 100644 --- a/pom.xml +++ b/pom.xml @@ -178,7 +178,7 @@ </sonar.jacoco.excludes> <!-- JVM Args for Testing --> - <argLine>-Xms1G -Xmx2G -XX:MaxPermSize=512M -Duser.timezone=UTC</argLine> + <argLine>-Xms1G -Xmx2G -XX:MaxPermSize=512M -noverify -Duser.timezone=UTC</argLine> </properties> <licenses>