KYLIN-1011
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/55a85df0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/55a85df0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/55a85df0 Branch: refs/heads/KYLIN-1011 Commit: 55a85df0cfbabbe0a176d05ae8830ea4c6ec9fa6 Parents: cc15d63 Author: qianhao.zhou <[email protected]> Authored: Wed Sep 9 19:13:01 2015 +0800 Committer: qianhao.zhou <[email protected]> Committed: Tue Sep 22 17:29:32 2015 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/KafkaDataLoader.java | 54 +++++++ build/bin/kylin.sh | 4 +- build/script/compress.sh | 1 + .../kylin/engine/streaming/StreamingCLI.java | 99 ------------ .../kylin/engine/streaming/cli/MonitorCLI.java | 70 +++++++++ .../engine/streaming/cli/StreamingCLI.java | 120 +++++++++++++++ .../streaming/monitor/StreamingMonitor.java | 154 +++++++++++++++++++ .../engine/streaming/util/StreamingUtils.java | 2 +- .../kafka/ByteBufferBackedInputStream.java | 53 +++++++ .../kylin/source/kafka/KafkaStreamingInput.java | 6 +- .../source/kafka/TimedJsonStreamParser.java | 142 +++++++++++++++++ .../kylin/source/kafka/config/KafkaConfig.java | 12 ++ .../apache/kylin/job/monitor/MonitorCLI.java | 69 --------- .../kylin/job/monitor/StreamingMonitor.java | 154 ------------------- .../kylin/job/streaming/KafkaDataLoader.java | 54 ------- .../kylin/job/streaming/StreamingBootstrap.java | 2 +- 16 files changed, 614 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java new file mode 100644 index 0000000..95fbc9d --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java @@ -0,0 +1,54 @@ +package org.apache.kylin.job.streaming; + +import java.util.List; +import java.util.Properties; + +import javax.annotation.Nullable; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.streaming.BrokerConfig; +import org.apache.kylin.streaming.KafkaClusterConfig; +import org.apache.kylin.streaming.StreamingConfig; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; + +/** + * Load prepared data into kafka(for test use) + */ +public class KafkaDataLoader { + + public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) { + + KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0); + String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { + @Nullable + @Override + public String apply(BrokerConfig brokerConfig) { + return brokerConfig.getHost() + ":" + brokerConfig.getPort(); + } + }), ","); + Properties props = new Properties(); + props.put("metadata.broker.list", brokerList); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("request.required.acks", "1"); + + ProducerConfig config = new ProducerConfig(props); + + Producer<String, String> producer = new Producer<String, String>(config); + + List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList(); + for (int i = 0; i < messages.size(); ++i) { + KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i)); + keyedMessages.add(keyedMessage); + } + producer.send(keyedMessages); + producer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/build/bin/kylin.sh ---------------------------------------------------------------------- diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index b27864c..b581e09 100644 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -115,7 +115,7 @@ then -Dkylin.hive.dependency=${hive_dependency} \ -Dkylin.hbase.dependency=${hbase_dependency} \ -Dspring.profiles.active=${spring_profile} \ - org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 & + org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 & echo "streaming started name: $3 id: $4" exit 0 elif [ $2 == "stop" ] @@ -170,7 +170,7 @@ then -Dkylin.hive.dependency=${hive_dependency} \ -Dkylin.hbase.dependency=${hbase_dependency} \ -Dspring.profiles.active=${spring_profile} \ - org.apache.kylin.job.monitor.MonitorCLI $@ >> ${KYLIN_HOME}/logs/monitor.log 2>&1 + org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1 exit 0 else echo "usage: kylin.sh start or kylin.sh stop" http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/build/script/compress.sh ---------------------------------------------------------------------- diff --git a/build/script/compress.sh b/build/script/compress.sh index a424b98..c70e567 100755 --- a/build/script/compress.sh +++ b/build/script/compress.sh @@ -21,6 +21,7 @@ rm -rf lib tomcat commit_SHA1 find kylin-${version} -type d -exec chmod 755 {} \; find kylin-${version} -type f -exec chmod 644 {} \; find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \; +mkdir -p ../dist tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version} rm -rf kylin-${version} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java deleted file mode 100644 index 8bf52c1..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - -package org.apache.kylin.engine.streaming; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.cache.RemoteCacheUpdater; -import org.apache.kylin.common.restclient.AbstractRestCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public class StreamingCLI { - - private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class); - - public static void main(String[] args) { - try { - AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater()); - - Preconditions.checkArgument(args[0].equals("streaming")); - Preconditions.checkArgument(args[1].equals("start")); - - int i = 2; - BootstrapConfig bootstrapConfig = new BootstrapConfig(); - while (i < args.length) { - String argName = args[i]; - switch (argName) { - case "-oneoff": - bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i])); - break; - case "-start": - bootstrapConfig.setStart(Long.parseLong(args[++i])); - break; - case "-end": - bootstrapConfig.setEnd(Long.parseLong(args[++i])); - break; - case "-streaming": - bootstrapConfig.setStreaming(args[++i]); - break; - case "-partition": - bootstrapConfig.setPartitionId(Integer.parseInt(args[++i])); - break; - case "-fillGap": - bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i])); - break; - default: - logger.warn("ignore this arg:" + argName); - } - i++; - } - final Runnable runnable = new OneOffStreamingBuilder(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()).build(); - runnable.run(); - logger.info("streaming process stop, exit with 0"); - System.exit(0); - } catch (Exception e) { - printArgsError(args); - logger.error("error start streaming", e); - System.exit(-1); - } - } - - private static void printArgsError(String[] args) { - logger.warn("invalid args:" + StringUtils.join(args, " ")); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java new file mode 100644 index 0000000..d7dc6b3 --- /dev/null +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java @@ -0,0 +1,70 @@ +package org.apache.kylin.engine.streaming.cli; + +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + */ +public class MonitorCLI { + + private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class); + + public static void main(String[] args) { + Preconditions.checkArgument(args[0].equals("monitor")); + + int i = 1; + List<String> receivers = null; + String host = null; + String tableName = null; + String authorization = null; + String cubeName = null; + String projectName = "default"; + while (i < args.length) { + String argName = args[i]; + switch (argName) { + case "-receivers": + receivers = Lists.newArrayList(StringUtils.split(args[++i], ";")); + break; + case "-host": + host = args[++i]; + break; + case "-tableName": + tableName = args[++i]; + break; + case "-authorization": + authorization = args[++i]; + break; + case "-cubeName": + cubeName = args[++i]; + break; + case "-projectName": + projectName = args[++i]; + break; + default: + throw new RuntimeException("invalid argName:" + argName); + } + i++; + } + Preconditions.checkArgument(receivers != null && receivers.size() > 0); + final StreamingMonitor streamingMonitor = new StreamingMonitor(); + if (tableName != null) { + logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";"))); + Preconditions.checkNotNull(host); + Preconditions.checkNotNull(authorization); + Preconditions.checkNotNull(tableName); + streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName); + } + if (cubeName != null) { + logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";"))); + streamingMonitor.checkCube(receivers, cubeName,host); + } + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java new file mode 100644 index 0000000..a4ccabc --- /dev/null +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -0,0 +1,120 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.engine.streaming.cli; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.cache.RemoteCacheUpdater; +import org.apache.kylin.common.restclient.AbstractRestCache; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.engine.streaming.BootstrapConfig; +import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class StreamingCLI { + + private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class); + + public static void main(String[] args) { + try { + AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater()); + + Preconditions.checkArgument(args[0].equals("streaming")); + Preconditions.checkArgument(args[1].equals("start")); + + int i = 2; + BootstrapConfig bootstrapConfig = new BootstrapConfig(); + while (i < args.length) { + String argName = args[i]; + switch (argName) { + case "-oneoff": + Boolean.parseBoolean(args[++i]); + break; + case "-start": + bootstrapConfig.setStart(Long.parseLong(args[++i])); + break; + case "-end": + bootstrapConfig.setEnd(Long.parseLong(args[++i])); + break; + case "-streaming": + bootstrapConfig.setStreaming(args[++i]); + break; + case "-partition": + bootstrapConfig.setPartitionId(Integer.parseInt(args[++i])); + break; + case "-fillGap": + bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i])); + break; + default: + logger.warn("ignore this arg:" + argName); + } + i++; + } + if (bootstrapConfig.isFillGap()) { + final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming()); + final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); + logger.info("all gaps:" + StringUtils.join(gaps, ",")); + for (Pair<Long, Long> gap : gaps) { + startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond()); + } + } else { + startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()); + logger.info("streaming process finished, exit with 0"); + System.exit(0); + } + } catch (Exception e) { + printArgsError(args); + logger.error("error start streaming", e); + System.exit(-1); + } + } + + private static void startOneOffCubeStreaming(String streaming, long start, long end) { + final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build(); + runnable.run(); + } + + private static void printArgsError(String[] args) { + logger.warn("invalid args:" + StringUtils.join(args, " ")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java new file mode 100644 index 0000000..a6b8a9f --- /dev/null +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java @@ -0,0 +1,154 @@ +package org.apache.kylin.engine.streaming.monitor; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.MailService; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + */ +public class StreamingMonitor { + + private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class); + + public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) { + String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") "; + StringBuilder stringBuilder = new StringBuilder(); + String url = host + "/kylin/api/query"; + PostMethod request = new PostMethod(url); + try { + + request.addRequestHeader("Authorization", "Basic " + authorization); + request.addRequestHeader("Content-Type", "application/json"); + String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName); + request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes())); + + int statusCode = new HttpClient().executeMethod(request); + String msg = Bytes.toString(request.getResponseBody()); + stringBuilder.append("host:").append(host).append("\n"); + stringBuilder.append("query:").append(query).append("\n"); + stringBuilder.append("statusCode:").append(statusCode).append("\n"); + if (statusCode == 200) { + title += "succeed"; + final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class); + stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n"); + stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n"); + } else { + title += "failed"; + stringBuilder.append("response:").append(msg).append("\n"); + } + } catch (Exception e) { + final StringWriter out = new StringWriter(); + e.printStackTrace(new PrintWriter(out)); + title += "failed"; + stringBuilder.append(out.toString()); + } finally { + request.releaseConnection(); + } + logger.info("title:" + title); + logger.info("content:" + stringBuilder.toString()); + sendMail(receivers, title, stringBuilder.toString()); + } + + public static final List<Pair<Long, Long>> findGaps(String cubeName) { + List<CubeSegment> segments = getSortedReadySegments(cubeName); + List<Pair<Long, Long>> gaps = Lists.newArrayList(); + for (int i = 0; i < segments.size() - 1; ++i) { + CubeSegment first = segments.get(i); + CubeSegment second = segments.get(i + 1); + if (first.getDateRangeEnd() == second.getDateRangeStart()) { + continue; + } else if (first.getDateRangeEnd() < second.getDateRangeStart()) { + gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart())); + } + } + return gaps; + } + + private static List<CubeSegment> getSortedReadySegments(String cubeName) { + final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); + Preconditions.checkNotNull(cube); + final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY); + logger.info("totally " + segments.size() + " cubeSegments"); + Collections.sort(segments); + return segments; + } + + public static final List<Pair<String, String>> findOverlaps(String cubeName) { + List<CubeSegment> segments = getSortedReadySegments(cubeName); + List<Pair<String, String>> overlaps = Lists.newArrayList(); + for (int i = 0; i < segments.size() - 1; ++i) { + CubeSegment first = segments.get(i); + CubeSegment second = segments.get(i + 1); + if (first.getDateRangeEnd() == second.getDateRangeStart()) { + continue; + } else { + overlaps.add(Pair.newPair(first.getName(), second.getName())); + } + } + return overlaps; + } + + public void checkCube(List<String> receivers, String cubeName, String host) { + final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); + if (cube == null) { + logger.info("cube:" + cubeName + " does not exist"); + return; + } + List<Pair<Long, Long>> gaps = findGaps(cubeName); + List<Pair<String, String>> overlaps = Lists.newArrayList(); + StringBuilder content = new StringBuilder(); + if (!gaps.isEmpty()) { + content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() { + @Nullable + @Override + public String apply(Pair<Long, Long> input) { + return parseInterval(input); + } + }), "\n")).append("\n"); + } + if (!overlaps.isEmpty()) { + content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n"); + } + if (content.length() > 0) { + logger.info(content.toString()); + sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString()); + } else { + logger.info("no gaps or overlaps"); + } + } + + private String parseInterval(Pair<Long, Long> interval) { + return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString()); + } + + private void sendMail(List<String> receivers, String title, String content) { + final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv()); + mailService.sendMail(receivers, title, content, false); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java index 718fc43..47db924 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java @@ -53,7 +53,7 @@ public class StreamingUtils { } public static IStreamingOutput getStreamingOutput(String streaming) { - return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStreamingOutput"); + return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput"); } public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java new file mode 100644 index 0000000..5883493 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + */ +class ByteBufferBackedInputStream extends InputStream { + + private ByteBuffer buf; + + public ByteBufferBackedInputStream(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public int read() throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + return buf.get() & 0xFF; + } + + @Override + public int read(byte[] bytes, int off, int len) + throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + + len = Math.min(len, buf.remaining()); + buf.get(bytes, off, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index 393b8e7..09dee50 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -79,6 +79,7 @@ public class KafkaStreamingInput implements IStreamingInput { @Override public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) { try { + logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime)); final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); final KafkaConfig kafkaConfig = kafkaConfigManager.getStreamingConfig(streaming); @@ -106,6 +107,7 @@ public class KafkaStreamingInput implements IStreamingInput { } } final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime); + logger.info("finish to get streaming batch, total message count:" + messages.size()); return new StreamingBatch(messages, timeRange); } catch (ReflectiveOperationException e) { throw new RuntimeException("failed to create instance of StreamingParser", e); @@ -220,8 +222,8 @@ public class KafkaStreamingInput implements IStreamingInput { }); if (!StringUtils.isEmpty(kafkaConfig.getParserName())) { Class clazz = Class.forName(kafkaConfig.getParserName()); - Constructor constructor = clazz.getConstructor(List.class); - return (StreamingParser) constructor.newInstance(columns); + Constructor constructor = clazz.getConstructor(List.class, String.class); + return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties()); } else { throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser"); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java new file mode 100644 index 0000000..9b5071b --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -0,0 +1,142 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.source.kafka; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.google.common.collect.Lists; +import kafka.message.MessageAndOffset; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.TimeUtil; +import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +/** + * each json message with a "timestamp" field + */ +public final class TimedJsonStreamParser implements StreamingParser { + + private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class); + + private List<TblColRef> allColumns; + private boolean formatTs = false; + private final ObjectMapper mapper = new ObjectMapper(); + private String tsColName = "timestamp"; + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)); + + public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) { + this.allColumns = allColumns; + if (!StringUtils.isEmpty(propertiesStr)) { + String[] properties = propertiesStr.split(";"); + for (String prop : properties) { + try { + String[] parts = prop.split("="); + if (parts.length == 2) { + switch (parts[0]) { + case "formatTs": + this.formatTs = Boolean.valueOf(parts[1]); + break; + case "tsColName": + this.tsColName = parts[1]; + break; + default: + break; + } + } + } catch (Exception e) { + logger.error("Failed to parse property " + prop); + //ignore + } + } + } + + logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName); + } + + @Override + public StreamingMessage parse(MessageAndOffset messageAndOffset) { + try { + Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + String tsStr = root.get(tsColName); + //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + // + //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData())); + long t; + if (StringUtils.isEmpty(tsStr)) { + t = 0; + } else { + t = Long.valueOf(tsStr); + } + ArrayList<String> result = Lists.newArrayList(); + + for (TblColRef column : allColumns) { + String columnName = column.getName(); + if (columnName.equalsIgnoreCase("minute_start")) { + long minuteStart = TimeUtil.getMinuteStart(t); + result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart)); + } else if (columnName.equalsIgnoreCase("hour_start")) { + long hourStart = TimeUtil.getHourStart(t); + result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart)); + } else if (columnName.equalsIgnoreCase("day_start")) { + //of day start we'll add yyyy-mm-dd + long ts = TimeUtil.getDayStart(t); + result.add(DateFormat.formatToDateStr(ts)); + } else { + String x = root.get(columnName.toLowerCase()); + result.add(x); + } + } + + return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap()); + + } catch (IOException e) { + logger.error("error", e); + throw new RuntimeException(e); + } + } + + @Override + public boolean filter(StreamingMessage streamingMessage) { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index b56231a..1aff0ce 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -81,6 +81,10 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("margin") private long margin; + //"configA=1;configB=2" + @JsonProperty("parserProperties") + private String parserProperties; + public List<KafkaClusterConfig> getKafkaClusterConfigs() { return kafkaClusterConfigs; } @@ -141,6 +145,14 @@ public class KafkaConfig extends RootPersistentEntity { this.margin = margin; } + public String getParserProperties() { + return parserProperties; + } + + public void setParserProperties(String parserProperties) { + this.parserProperties = parserProperties; + } + @Override public KafkaConfig clone() { try { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java deleted file mode 100644 index 7b9831a..0000000 --- a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java +++ /dev/null @@ -1,69 +0,0 @@ -package org.apache.kylin.job.monitor; - -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - */ -public class MonitorCLI { - - private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class); - - public static void main(String[] args) { - Preconditions.checkArgument(args[0].equals("monitor")); - - int i = 1; - List<String> receivers = null; - String host = null; - String tableName = null; - String authorization = null; - String cubeName = null; - String projectName = "default"; - while (i < args.length) { - String argName = args[i]; - switch (argName) { - case "-receivers": - receivers = Lists.newArrayList(StringUtils.split(args[++i], ";")); - break; - case "-host": - host = args[++i]; - break; - case "-tableName": - tableName = args[++i]; - break; - case "-authorization": - authorization = args[++i]; - break; - case "-cubeName": - cubeName = args[++i]; - break; - case "-projectName": - projectName = args[++i]; - break; - default: - throw new RuntimeException("invalid argName:" + argName); - } - i++; - } - Preconditions.checkArgument(receivers != null && receivers.size() > 0); - final StreamingMonitor streamingMonitor = new StreamingMonitor(); - if (tableName != null) { - logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";"))); - Preconditions.checkNotNull(host); - Preconditions.checkNotNull(authorization); - Preconditions.checkNotNull(tableName); - streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName); - } - if (cubeName != null) { - logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";"))); - streamingMonitor.checkCube(receivers, cubeName,host); - } - System.exit(0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java deleted file mode 100644 index e23f065..0000000 --- a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.apache.kylin.job.monitor; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; - -import javax.annotation.Nullable; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; -import org.apache.commons.httpclient.methods.PostMethod; -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.MailService; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - */ -public class StreamingMonitor { - - private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class); - - public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) { - String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") "; - StringBuilder stringBuilder = new StringBuilder(); - String url = host + "/kylin/api/query"; - PostMethod request = new PostMethod(url); - try { - - request.addRequestHeader("Authorization", "Basic " + authorization); - request.addRequestHeader("Content-Type", "application/json"); - String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName); - request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes())); - - int statusCode = new HttpClient().executeMethod(request); - String msg = Bytes.toString(request.getResponseBody()); - stringBuilder.append("host:").append(host).append("\n"); - stringBuilder.append("query:").append(query).append("\n"); - stringBuilder.append("statusCode:").append(statusCode).append("\n"); - if (statusCode == 200) { - title += "succeed"; - final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class); - stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n"); - stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n"); - } else { - title += "failed"; - stringBuilder.append("response:").append(msg).append("\n"); - } - } catch (Exception e) { - final StringWriter out = new StringWriter(); - e.printStackTrace(new PrintWriter(out)); - title += "failed"; - stringBuilder.append(out.toString()); - } finally { - request.releaseConnection(); - } - logger.info("title:" + title); - logger.info("content:" + stringBuilder.toString()); - sendMail(receivers, title, stringBuilder.toString()); - } - - public static final List<Pair<Long, Long>> findGaps(String cubeName) { - List<CubeSegment> segments = getSortedReadySegments(cubeName); - List<Pair<Long, Long>> gaps = Lists.newArrayList(); - for (int i = 0; i < segments.size() - 1; ++i) { - CubeSegment first = segments.get(i); - CubeSegment second = segments.get(i + 1); - if (first.getDateRangeEnd() == second.getDateRangeStart()) { - continue; - } else if (first.getDateRangeEnd() < second.getDateRangeStart()) { - gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart())); - } - } - return gaps; - } - - private static List<CubeSegment> getSortedReadySegments(String cubeName) { - final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); - Preconditions.checkNotNull(cube); - final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY); - logger.info("totally " + segments.size() + " cubeSegments"); - Collections.sort(segments); - return segments; - } - - public static final List<Pair<String, String>> findOverlaps(String cubeName) { - List<CubeSegment> segments = getSortedReadySegments(cubeName); - List<Pair<String, String>> overlaps = Lists.newArrayList(); - for (int i = 0; i < segments.size() - 1; ++i) { - CubeSegment first = segments.get(i); - CubeSegment second = segments.get(i + 1); - if (first.getDateRangeEnd() == second.getDateRangeStart()) { - continue; - } else { - overlaps.add(Pair.newPair(first.getName(), second.getName())); - } - } - return overlaps; - } - - public void checkCube(List<String> receivers, String cubeName, String host) { - final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); - if (cube == null) { - logger.info("cube:" + cubeName + " does not exist"); - return; - } - List<Pair<Long, Long>> gaps = findGaps(cubeName); - List<Pair<String, String>> overlaps = Lists.newArrayList(); - StringBuilder content = new StringBuilder(); - if (!gaps.isEmpty()) { - content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() { - @Nullable - @Override - public String apply(Pair<Long, Long> input) { - return parseInterval(input); - } - }), "\n")).append("\n"); - } - if (!overlaps.isEmpty()) { - content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n"); - } - if (content.length() > 0) { - logger.info(content.toString()); - sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString()); - } else { - logger.info("no gaps or overlaps"); - } - } - - private String parseInterval(Pair<Long, Long> interval) { - return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString()); - } - - private void sendMail(List<String> receivers, String title, String content) { - final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv()); - mailService.sendMail(receivers, title, content, false); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java deleted file mode 100644 index 95fbc9d..0000000 --- a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.apache.kylin.job.streaming; - -import java.util.List; -import java.util.Properties; - -import javax.annotation.Nullable; - -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.streaming.BrokerConfig; -import org.apache.kylin.streaming.KafkaClusterConfig; -import org.apache.kylin.streaming.StreamingConfig; - -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; - -/** - * Load prepared data into kafka(for test use) - */ -public class KafkaDataLoader { - - public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) { - - KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0); - String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { - @Nullable - @Override - public String apply(BrokerConfig brokerConfig) { - return brokerConfig.getHost() + ":" + brokerConfig.getPort(); - } - }), ","); - Properties props = new Properties(); - props.put("metadata.broker.list", brokerList); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("request.required.acks", "1"); - - ProducerConfig config = new ProducerConfig(props); - - Producer<String, String> producer = new Producer<String, String>(config); - - List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList(); - for (int i = 0; i < messages.size(); ++i) { - KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i)); - keyedMessages.add(keyedMessage); - } - producer.send(keyedMessages); - producer.close(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java index 4212fea..551006f 100644 --- a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java +++ b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java @@ -63,7 +63,7 @@ import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.job.monitor.StreamingMonitor; +import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; import org.apache.kylin.metadata.model.IntermediateColumnDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.hbase.HBaseConnection;
