Repository: kylin Updated Branches: refs/heads/yaho-cube-planner 47b5a0ded -> 02b67cebf
http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java new file mode 100644 index 0000000..5af2bf9 --- /dev/null +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import java.util.List; +import java.util.Properties; + +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.ReporterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reporter which listens for new records and publishes them to hive. + */ +public class HiveReservoirReporter extends ActiveReservoirReporter { + + public static final String HIVE_REPORTER_SUFFIX = "HIVE"; + public static final HiveSink sink = new HiveSink(); + protected static final Logger logger = LoggerFactory.getLogger(HiveReservoirReporter.class); + private final ActiveReservoir activeReservoir; + private final HiveReservoirListener listener; + + public HiveReservoirReporter(ActiveReservoir activeReservoir, Properties props) throws Exception { + this.activeReservoir = activeReservoir; + this.listener = new HiveReservoirListener(props); + } + + /** + * Returns a new {@link Builder} for {@link HiveReservoirReporter}. + * + * @param activeReservoir the registry to report + * @return a {@link Builder} instance for a {@link HiveReservoirReporter} + */ + public static Builder forRegistry(ActiveReservoir activeReservoir) { + return new Builder(activeReservoir); + } + + public static String getTableFromSubject(String subject) { + return sink.getTableFromSubject(subject); + } + + /** + * Starts the reporter. + */ + public void start() { + activeReservoir.addListener(listener); + } + + /** + * Stops the reporter. + */ + public void stop() { + activeReservoir.removeListener(listener); + } + + /** + * Stops the reporter. + */ + @Override + public void close() { + stop(); + } + + /** + * A builder for {@link HiveReservoirReporter} instances. + */ + public static class Builder extends ReporterBuilder { + + private Builder(ActiveReservoir activeReservoir) { + super(activeReservoir); + } + + private void setFixedProperties() { + } + + /** + * Builds a {@link HiveReservoirReporter} with the given properties. + * + * @return a {@link HiveReservoirReporter} + */ + public HiveReservoirReporter build() throws Exception { + setFixedProperties(); + return new HiveReservoirReporter(registry, props); + } + } + + private class HiveReservoirListener implements ActiveReservoirListener { + + HiveProducer producer; + + private HiveReservoirListener(Properties props) throws Exception { + producer = new HiveProducer(props); + } + + public boolean onRecordUpdate(final List<Record> records) { + try { + producer.send(records); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return false; + } + return true; + } + + public boolean onRecordUpdate(final Record record) { + try { + producer.send(record); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return false; + } + return true; + } + + public void close() { + producer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java new file mode 100644 index 0000000..3b0eefe --- /dev/null +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.hive; + +import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX; +import static org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter.HIVE_REPORTER_SUFFIX; + +import org.apache.kylin.metrics.lib.Sink; + +public class HiveSink implements Sink { + public String getTableFromSubject(String subject) { + return KYLIN_PREFIX + "." + HIVE_REPORTER_SUFFIX + "_" + subject; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml new file mode 100644 index 0000000..bf83b4b --- /dev/null +++ b/metrics-reporter-kafka/pom.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kylin-metrics-reporter-kafka</artifactId> + <packaging>jar</packaging> + <name>Apache Kylin - Metrics Reporter Kafka</name> + <description>Apache Kylin - Metrics Reporter Kafka</description> + + <parent> + <artifactId>kylin</artifactId> + <groupId>org.apache.kylin</groupId> + <version>2.1.0</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-metrics</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java new file mode 100644 index 0000000..311f3e3 --- /dev/null +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.kafka; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kylin.metrics.lib.ActiveReservoirListener; +import org.apache.kylin.metrics.lib.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class KafkaActiveReserviorListener implements ActiveReservoirListener { + + public static final long TOPIC_AVAILABLE_TAG = 0L; + protected static final Logger logger = LoggerFactory.getLogger(KafkaActiveReserviorListener.class); + protected Long maxBlockMs = 1800000L; + protected int maxRecordForLogNum = 10000; + protected int maxRecordSkipForLogNum = 10000; + protected ConcurrentHashMap<String, Long> topicsIfAvailable = new ConcurrentHashMap<>(); + private int nRecord = 0; + private int nRecordSkip = 0; + private Callback produceCallback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + exception.printStackTrace(); + return; + } + logger.info("topic:" + metadata.topic() + "; partition: " + metadata.partition() + "; offset: " + metadata.offset()); + } + }; + + protected abstract String decorateTopic(String topic); + + protected abstract void tryFetchMetadataFor(String topic); + + protected abstract void send(String topic, Record record, Callback callback); + + protected void sendWrapper(String topic, Record record, Callback callback) { + try { + send(topic, record, callback); + } catch (org.apache.kafka.common.errors.TimeoutException e) { + setUnAvailable(topic); + throw e; + } + } + + public boolean onRecordUpdate(final List<Record> records) { + try { + for (Record record : records) { + String topic = decorateTopic(record.getType()); + if (!checkAvailable(topic)) { + if (nRecordSkip % maxRecordSkipForLogNum == 0) { + nRecordSkip = 0; + logger.warn("Skip to send record to topic " + topic); + } + nRecordSkip++; + continue; + } + if (nRecord % maxRecordForLogNum == 0) { + nRecord = 0; + sendWrapper(topic, record, produceCallback); + } else { + sendWrapper(topic, record, null); + } + nRecord++; + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + return false; + } + return true; + } + + protected boolean checkAvailable(String topic) { + Long timeBlock = topicsIfAvailable.get(topic); + if (timeBlock != null && timeBlock == TOPIC_AVAILABLE_TAG) { + return true; + } else if (timeBlock == null || System.currentTimeMillis() - timeBlock > maxBlockMs) { + try { + tryFetchMetadataFor(topic); + topicsIfAvailable.put(topic, TOPIC_AVAILABLE_TAG); + return true; + } catch (org.apache.kafka.common.errors.TimeoutException e) { + logger.warn("Fail to fetch metadata for topic " + topic); + setUnAvailable(topic); + return false; + } + } + return false; + } + + protected void setUnAvailable(String topic) { + topicsIfAvailable.put(topic, System.currentTimeMillis()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java new file mode 100644 index 0000000..a5ea3aa --- /dev/null +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.kafka; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kylin.metrics.lib.ActiveReservoir; +import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +import org.apache.kylin.metrics.lib.Record; +import org.apache.kylin.metrics.lib.impl.ReporterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reporter which listens for new records and publishes them to Kafka. + */ +public class KafkaReservoirReporter extends ActiveReservoirReporter { + + public static final String KAFKA_REPORTER_SUFFIX = "KAFKA"; + public static final KafkaSink sink = new KafkaSink(); + protected static final Logger logger = LoggerFactory.getLogger(KafkaReservoirReporter.class); + private final ActiveReservoir activeReservoir; + private final KafkaReservoirListener listener; + + private KafkaReservoirReporter(ActiveReservoir activeReservoir, Properties props) { + this.activeReservoir = activeReservoir; + this.listener = new KafkaReservoirListener(props); + } + + /** + * Returns a new {@link Builder} for {@link KafkaReservoirReporter}. + * + * @param activeReservoir the registry to report + * @return a {@link Builder} instance for a {@link KafkaReservoirReporter} + */ + public static Builder forRegistry(ActiveReservoir activeReservoir) { + return new Builder(activeReservoir); + } + + private static String decorateTopic(String topic) { + return ActiveReservoirReporter.KYLIN_PREFIX + "_" + KAFKA_REPORTER_SUFFIX + "_" + topic; + } + + public static String getTableFromSubject(String subject) { + return sink.getTableFromSubject(subject); + } + + /** + * Starts the reporter. + */ + public void start() { + activeReservoir.addListener(listener); + } + + /** + * Stops the reporter. + */ + public void stop() { + activeReservoir.removeListener(listener); + } + + /** + * Stops the reporter. + */ + @Override + public void close() { + stop(); + } + + /** + * A builder for {@link KafkaReservoirReporter} instances. + */ + public static class Builder extends ReporterBuilder { + + private Builder(ActiveReservoir activeReservoir) { + super(activeReservoir); + } + + private void setFixedProperties() { + props.put("key.serializer", ByteArraySerializer.class.getName()); + props.put("value.serializer", ByteArraySerializer.class.getName()); + } + + /** + * Builds a {@link KafkaReservoirReporter} with the given properties. + * + * @return a {@link KafkaReservoirReporter} + */ + public KafkaReservoirReporter build() { + setFixedProperties(); + return new KafkaReservoirReporter(registry, props); + } + } + + private class KafkaReservoirListener extends KafkaActiveReserviorListener { + protected final Producer<byte[], byte[]> producer; + + private KafkaReservoirListener(Properties props) { + producer = new KafkaProducer<>(props); + } + + public void tryFetchMetadataFor(String topic) { + producer.partitionsFor(topic); + } + + protected String decorateTopic(String topic) { + return KafkaReservoirReporter.decorateTopic(topic); + } + + protected void send(String topic, Record record, Callback callback) { + producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback); + } + + public void close() { + producer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java new file mode 100644 index 0000000..f756b8a --- /dev/null +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metrics.lib.impl.kafka; + +import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX; + +import org.apache.kylin.metrics.lib.Sink; + +public class KafkaSink implements Sink { + public String getTableFromSubject(String subject) { + return KYLIN_PREFIX + "." + KafkaReservoirReporter.KAFKA_REPORTER_SUFFIX + "_" + subject; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1c58791..064d0c4 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,9 @@ <calcite.version>1.12.0-kylin-r2</calcite.version> <avatica.version>1.9.0</avatica.version> + <!-- Dropwizard Metrics Version --> + <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version> + <!-- Hadoop Common deps, keep compatible with hadoop2.version --> <zookeeper.version>3.4.8</zookeeper.version> <curator.version>2.7.1</curator.version> @@ -1124,6 +1127,9 @@ <module>tool-assembly</module> <module>kylin-it</module> <module>tomcat-ext</module> + <module>core-metrics</module> + <module>metrics-reporter-hive</module> + <module>metrics-reporter-kafka</module> </modules> <reporting>