Revert "KYLIN-1726 add test case BuildCubeWithStream2" This reverts commit 3e081b3fbec4fc8a6cc4ddf8795d2fd581ae04f4.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f9692fa5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f9692fa5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f9692fa5 Branch: refs/heads/tempmaster Commit: f9692fa5599de04030599b699f5460bdc7cf0e42 Parents: c68bba0 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 19 23:49:51 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Sep 19 23:49:51 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/Kafka10DataLoader.java | 4 + .../apache/kylin/common/KylinConfigBase.java | 4 - .../java/org/apache/kylin/cube/CubeManager.java | 28 +- .../org/apache/kylin/job/dao/ExecutableDao.java | 1 - .../kylin/job/manager/ExecutableManager.java | 2 +- .../streaming/cube/StreamingCubeBuilder.java | 2 +- .../test_streaming_table_cube_desc.json | 3 +- .../kylin/provision/BuildCubeWithStream.java | 32 +-- .../kylin/provision/BuildCubeWithStream2.java | 274 ------------------- .../kylin/rest/controller/CubeController.java | 8 +- .../apache/kylin/rest/service/JobService.java | 4 +- .../kylin/source/kafka/SeekOffsetStep.java | 7 +- 12 files changed, 49 insertions(+), 320 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java index 2b299cc..a5132af 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java @@ -65,9 +65,13 @@ public class Kafka10DataLoader extends StreamDataLoader { props.put("retry.backoff.ms", "1000"); KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props); + int boundary = messages.size() / 10; for (int i = 0; i < messages.size(); ++i) { ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); producer.send(keyedMessage); + if (i % boundary == 0) { + logger.info("sending " + i + " messages to " + this.toString()); + } } logger.info("sent " + messages.size() + " messages to " + this.toString()); producer.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3b06ed8..fafb1fc 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -809,8 +809,4 @@ abstract public class KylinConfigBase implements Serializable { public String getCreateFlatHiveTableMethod() { return getOptional("kylin.hive.create.flat.table.method", "1"); } - - public int getMaxBuildingSegments() { - return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "1")); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 57b9510..d494fcc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -400,8 +400,13 @@ public class CubeManager implements IRealizationProvider { } public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { + return appendSegment(cube, startDate, endDate, startOffset, endOffset, true); + } + + public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException { - checkBuildingSegment(cube); + if (strictChecking) + checkNoBuildingSegment(cube); if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) { // try figure out a reasonable start if missing @@ -431,9 +436,12 @@ public class CubeManager implements IRealizationProvider { updateCube(cubeBuilder); return newSegment; } - public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { - checkBuildingSegment(cube); + return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true); + } + + public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException { + checkNoBuildingSegment(cube); CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); @@ -454,7 +462,7 @@ public class CubeManager implements IRealizationProvider { if (startDate >= endDate && startOffset >= endOffset) throw new IllegalArgumentException("Invalid merge range"); - checkBuildingSegment(cube); + checkNoBuildingSegment(cube); checkCubeIsPartitioned(cube); boolean isOffsetsOn = cube.getSegments().get(0).isSourceOffsetsOn(); @@ -580,10 +588,9 @@ public class CubeManager implements IRealizationProvider { } } - private void checkBuildingSegment(CubeInstance cube) { - int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); - if (cube.getBuildingSegments().size() >= maxBuldingSeg) { - throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; "); + private void checkNoBuildingSegment(CubeInstance cube) { + if (cube.getBuildingSegments().size() > 0) { + throw new IllegalStateException("There is already a building segment!"); } } @@ -722,9 +729,8 @@ public class CubeManager implements IRealizationProvider { } for (CubeSegment seg : tobe) { - if (isReady(seg) == false) { - logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet."); - } + if (isReady(seg) == false) + throw new IllegalStateException("For cube " + cube + ", segment " + seg + " should be READY but is not"); } List<CubeSegment> toRemoveSegs = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 5cae5ac..8808a56 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -207,7 +207,6 @@ public class ExecutableDao { } public void updateJobOutput(ExecutableOutputPO output) throws PersistentException { - logger.debug("updating job output, id: " + output.getUuid()); try { final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output); output.setLastModified(ts); http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java index d42b924..3a19486 100644 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java @@ -278,7 +278,7 @@ public class ExecutableManager { ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus()); if (newStatus != null && oldStatus != newStatus) { if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) { - throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId); + throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus); } jobOutput.setStatus(newStatus.toString()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java index a42ec05..180f0b8 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java @@ -119,7 +119,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); try { - CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0); + CubeSegment segment = cubeManager.appendSegment(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), 0, 0, false); segment.setLastBuildJobID(segment.getUuid()); // give a fake job id segment.setInputRecords(streamingBatch.getMessages().size()); segment.setLastBuildTime(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json index 8279417..ef10c1e 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json @@ -106,8 +106,7 @@ } } ], "override_kylin_properties": { - "kylin.cube.algorithm": "inmem", - "kylin.cube.building.segment.max": "3" + "kylin.cube.algorithm": "inmem" }, "notify_list" : [ ], "status_need_notify" : [ ], http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index b7c609e..9e779ab 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -137,21 +137,15 @@ public class BuildCubeWithStream { int numberOfRecrods1 = 10000; generateStreamData(date1, date2, numberOfRecrods1); - ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.SUCCEED); - long date3 = f.parse("2013-04-01").getTime(); - int numberOfRecords2 = 5000; - generateStreamData(date2, date3, numberOfRecords2); - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.SUCCEED); + buildSegment(cubeName, 0, Long.MAX_VALUE); - //empty build - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.DISCARDED); + long date3 = f.parse("2013-04-01").getTime(); + int numberOfRecrods2 = 5000; + generateStreamData(date2, date3, numberOfRecrods2); + buildSegment(cubeName, 0, Long.MAX_VALUE); //merge - result = mergeSegment(cubeName, 0, 15000); - Assert.assertTrue(result == ExecutableState.SUCCEED); + mergeSegment(cubeName, 0, 15000); List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(); Assert.assertTrue(segments.size() == 1); @@ -165,16 +159,16 @@ public class BuildCubeWithStream { } - private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { + private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); - return job.getStatus(); + return job.getId(); } private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception { - CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); segment.setAdditionalInfo(partitionOffsetMap); CubeInstance cubeInstance = cubeManager.getCube(cubeName); CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); @@ -187,12 +181,12 @@ public class BuildCubeWithStream { return job.getId(); } - private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { + CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); - return job.getStatus(); + return job.getId(); } protected void deployEnv() throws IOException { @@ -222,7 +216,7 @@ public class BuildCubeWithStream { protected void waitForJob(String jobId) { while (true) { AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) { + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { break; } else { try { http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java deleted file mode 100644 index d48a473..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.provision; - -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.Random; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Lists; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.engine.streaming.StreamingManager; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.job.streaming.Kafka10DataLoader; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.config.BrokerConfig; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.Thread.sleep; - -/** - * for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently. - */ -public class BuildCubeWithStream2 { - - private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class); - - private CubeManager cubeManager; - private DefaultScheduler scheduler; - protected ExecutableManager jobService; - private static final String cubeName = "test_streaming_table_cube"; - - private KafkaConfig kafkaConfig; - private MockKafka kafkaServer; - private static boolean generateData = true; - - public void before() throws Exception { - deployEnv(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final String factTable = cubeInstance.getFactTable(); - - final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); - final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable); - kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName()); - - String topicName = UUID.randomUUID().toString(); - String localIp = NetworkUtils.getLocalIp(); - BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0); - brokerConfig.setHost(localIp); - kafkaConfig.setTopic(topicName); - KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig); - - startEmbeddedKafka(topicName, brokerConfig); - } - - private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { - //Start mock Kakfa - String zkConnectionStr = "sandbox:2181"; - ZkConnection zkConnection = new ZkConnection(zkConnectionStr); - // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); - kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); - kafkaServer.start(); - - kafkaServer.createTopic(topicName, 3, 1); - kafkaServer.waitTopicUntilReady(topicName); - - MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName); - Assert.assertEquals(topicName, topicMetadata.topic()); - } - - private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { - if (numberOfRecords <= 0) - return; - Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); - logger.info("Test data inserted into Kafka"); - } - - private void clearSegment(String cubeName) throws Exception { - CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); - } - - public void build() throws Exception { - clearSegment(cubeName); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - final long date1 = 0; - final long date2 = f.parse("2013-01-01").getTime(); - - new Thread(new Runnable() { - @Override - public void run() { - - Random rand = new Random(); - while (generateData == true) { - try { - generateStreamData(date1, date2, rand.nextInt(100)); - sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time, from 0 to 100 seconds - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - }).start(); - ExecutorService executorService = Executors.newFixedThreadPool(4); - - List<FutureTask<ExecutableState>> futures = Lists.newArrayList(); - for (int i = 0; i < 5; i++) { - FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { - @Override - public ExecutableState call() { - ExecutableState result = null; - try { - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - } catch (Exception e) { - e.printStackTrace(); - } - - return result; - } - }); - - executorService.submit(futureTask); - futures.add(futureTask); - Thread.sleep(2 * 60 * 1000); // sleep 2 mintues - } - - generateData = false; // stop generating message to kafka - executorService.shutdown(); - int succeedBuild = 0; - for (int i = 0; i < futures.size(); i++) { - ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES); - logger.info("Checking building task " + i + " whose state is " + result); - Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED ); - if (result == ExecutableState.SUCCEED) - succeedBuild++; - } - - logger.info(succeedBuild + " build jobs have been successfully completed."); - List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY); - Assert.assertTrue(segments.size() == succeedBuild); - - } - - - private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); - DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getStatus(); - } - - protected void deployEnv() throws IOException { - DeployUtil.overrideJobJarLocations(); - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - } - - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); - if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); - } - - public static void afterClass() throws Exception { - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - public void after() { - kafkaServer.stop(); - } - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) { - break; - } else { - try { - sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - public static void main(String[] args) throws Exception { - try { - beforeClass(); - - BuildCubeWithStream2 buildCubeWithStream = new BuildCubeWithStream2(); - buildCubeWithStream.before(); - buildCubeWithStream.build(); - logger.info("Build is done"); - buildCubeWithStream.after(); - afterClass(); - logger.info("Going to exit"); - System.exit(0); - } catch (Exception e) { - logger.error("error", e); - System.exit(1); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 42b117c..669f53e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -272,7 +272,7 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) { - return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment()); + return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, req.getBuildType(), true, req.isForce() || req.isForceMergeEmptySegment()); } /** Build/Rebuild a cube segment by source offset */ @@ -286,16 +286,16 @@ public class CubeController extends BasicController { @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT }) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) { - return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), req.isForce()); + return buildInternal(cubeName, 0, 0, req.getStartSourceOffset(), req.getEndSourceOffset(), req.getBuildType(), false, req.isForce()); } private JobInstance buildInternal(String cubeName, long startTime, long endTime, // - long startOffset, long endOffset, String buildType, boolean force) { + long startOffset, long endOffset, String buildType, boolean strictCheck, boolean force) { try { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, // - CubeBuildTypeEnum.valueOf(buildType), force, submitter); + CubeBuildTypeEnum.valueOf(buildType), strictCheck, force, submitter); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 5c704ba..8929bf1 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -199,7 +199,7 @@ public class JobService extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, // - CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException, JobException { + CubeBuildTypeEnum buildType, boolean strictCheck, boolean force, String submitter) throws IOException, JobException { if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { throw new BadRequestException("Broken cube " + cube.getName() + " can't be built"); @@ -211,7 +211,7 @@ public class JobService extends BasicService { DefaultChainedExecutable job; if (buildType == CubeBuildTypeEnum.BUILD) { - CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset); + CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, strictCheck); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) { CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); http://git-wip-us.apache.org/repos/asf/kylin/blob/f9692fa5/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java index 9369e6f..479f1b8 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java @@ -17,6 +17,10 @@ */ package org.apache.kylin.source.kafka; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Maps; +import org.apache.commons.math3.util.MathUtils; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,6 +38,7 @@ import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -120,7 +125,7 @@ public class SeekOffsetStep extends AbstractExecutable { } catch (IOException e) { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } else { CubeUpdate cubeBuilder = new CubeUpdate(cube); cubeBuilder.setToRemoveSegs(segment);