Revert "refactor BuildCubeWithStream" This reverts commit a08dd2e03900b321617647d1dbf1c4ee8b4b18c2.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/34970d9f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/34970d9f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/34970d9f Branch: refs/heads/tempmaster Commit: 34970d9fb4a6f4ef0237e5091dc0c7eb1ee86296 Parents: e347c75 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 19 23:49:18 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Sep 19 23:49:18 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 7 +- .../kylin/provision/BuildCubeWithStream.java | 10 +- .../kylin/provision/BuildCubeWithStream2.java | 145 ++++++++++++++++++- 3 files changed, 150 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/34970d9f/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 9e9df05..9b282e3 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -187,7 +187,6 @@ public class DeployUtil { File tmpFile = File.createTempFile(factTableName, "csv"); FileOutputStream out = new FileOutputStream(tmpFile); - InputStream tempIn = null; try { if (store.exists(factTablePath)) { InputStream oldContent = store.getResource(factTablePath).inputStream; @@ -195,15 +194,13 @@ public class DeployUtil { } IOUtils.copy(in, out); IOUtils.closeQuietly(in); - IOUtils.closeQuietly(out); store.deleteResource(factTablePath); - tempIn = new FileInputStream(tmpFile); - store.putResource(factTablePath, tempIn, System.currentTimeMillis()); + in = new FileInputStream(tmpFile); + store.putResource(factTablePath, in, System.currentTimeMillis()); } finally { IOUtils.closeQuietly(out); IOUtils.closeQuietly(in); - IOUtils.closeQuietly(tempIn); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/34970d9f/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index bfe1d0a..6e5313f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -62,10 +62,10 @@ public class BuildCubeWithStream { private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class); - protected CubeManager cubeManager; + private CubeManager cubeManager; private DefaultScheduler scheduler; protected ExecutableManager jobService; - static final String cubeName = "test_streaming_table_cube"; + private static final String cubeName = "test_streaming_table_cube"; private KafkaConfig kafkaConfig; private MockKafka kafkaServer; @@ -114,13 +114,13 @@ public class BuildCubeWithStream { Assert.assertEquals(topicName, topicMetadata.topic()); } - protected void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { + private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); logger.info("Test data inserted into Kafka"); } - protected void clearSegment(String cubeName) throws Exception { + private void clearSegment(String cubeName) throws Exception { CubeInstance cube = cubeManager.getCube(cubeName); // remove all existing segments CubeUpdate cubeBuilder = new CubeUpdate(cube); @@ -187,7 +187,7 @@ public class BuildCubeWithStream { return job.getId(); } - protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { + private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/34970d9f/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java index 7959701..2812446 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java @@ -18,11 +18,13 @@ package org.apache.kylin.provision; +import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.List; import java.util.Random; import java.util.TimeZone; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,9 +32,32 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.job.streaming.Kafka10DataLoader; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.BrokerConfig; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +67,79 @@ import static java.lang.Thread.sleep; /** * for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently. */ -public class BuildCubeWithStream2 extends BuildCubeWithStream { +public class BuildCubeWithStream2 { private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class); + + private CubeManager cubeManager; + private DefaultScheduler scheduler; + protected ExecutableManager jobService; + private static final String cubeName = "test_streaming_table_cube"; + + private KafkaConfig kafkaConfig; + private MockKafka kafkaServer; private static boolean generateData = true; - @Override + public void before() throws Exception { + deployEnv(); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.createInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + cubeManager = CubeManager.getInstance(kylinConfig); + + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final String factTable = cubeInstance.getFactTable(); + + final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); + final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable); + kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName()); + + String topicName = UUID.randomUUID().toString(); + String localIp = NetworkUtils.getLocalIp(); + BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0); + brokerConfig.setHost(localIp); + kafkaConfig.setTopic(topicName); + KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig); + + startEmbeddedKafka(topicName, brokerConfig); + } + + private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { + //Start mock Kakfa + String zkConnectionStr = "sandbox:2181"; + ZkConnection zkConnection = new ZkConnection(zkConnectionStr); + // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); + kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); + kafkaServer.start(); + + kafkaServer.createTopic(topicName, 3, 1); + kafkaServer.waitTopicUntilReady(topicName); + + MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName); + Assert.assertEquals(topicName, topicMetadata.topic()); + } + + private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { + if (numberOfRecords <= 0) + return; + Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); + logger.info("Test data inserted into Kafka"); + } + + private void clearSegment(String cubeName) throws Exception { + CubeInstance cube = cubeManager.getCube(cubeName); + // remove all existing segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + cubeManager.updateCube(cubeBuilder); + } + public void build() throws Exception { clearSegment(cubeName); SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); @@ -112,6 +204,55 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream { } + + private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { + CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getStatus(); + } + + protected void deployEnv() throws IOException { + DeployUtil.overrideJobJarLocations(); + DeployUtil.initCliWorkDir(); + DeployUtil.deployMetadata(); + } + + public static void beforeClass() throws Exception { + logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); + if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + } + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); + } + + public static void afterClass() throws Exception { + HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + public void after() { + kafkaServer.stop(); + DefaultScheduler.destroyInstance(); + } + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) { + break; + } else { + try { + sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + public static void main(String[] args) throws Exception { try { beforeClass();