This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 948368fd3210add6e1cb71964c785fb2f2ea3bae
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Fri Jun 19 13:41:09 2020 +0800

    KYLIN-4508 Add unit test for core-metrics module & reporters
---
 .../kylin/metrics/lib/impl/TimedRecordEvent.java   |  32 ++++
 metrics-reporter-hive/pom.xml                      |  25 ++++
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  |   7 +-
 .../metrics/lib/impl/hive/HiveProducerRecord.java  | 141 +++++++-----------
 .../lib/impl/hive/HiveReservoirReporter.java       |  39 +++--
 .../tool/metrics/systemcube/HiveTableCreator.java  |   3 +-
 .../lib/impl/hive/HiveProducerRecordTest.java      |  81 +++++++++++
 .../metrics/lib/impl/hive/HiveProducerTest.java    | 161 +++++++++++++++++++++
 .../lib/impl/hive/HiveReservoirReporterTest.java   |  88 +++++++++++
 metrics-reporter-kafka/pom.xml                     |  19 +++
 .../impl/kafka/KafkaActiveReserviorListener.java   |   8 +
 .../lib/impl/kafka/KafkaReservoirReporter.java     |   6 +-
 .../lib/impl/kafka/KafkaReservoirReporterTest.java |  79 ++++++++++
 13 files changed, 585 insertions(+), 104 deletions(-)

diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
index a866163..984d5f5 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimedRecordEvent.java
@@ -44,4 +44,36 @@ public class TimedRecordEvent extends RecordEvent {
         super.resetTime();
         addTimeDetails();
     }
+
+    public String getYear() {
+        return (String) get(TimePropertyEnum.YEAR.toString());
+    }
+
+    public String getMonth() {
+        return (String) get(TimePropertyEnum.MONTH.toString());
+    }
+
+    public String getWeekBeginDate() {
+        return (String) get(TimePropertyEnum.WEEK_BEGIN_DATE.toString());
+    }
+
+    public String getDayDate() {
+        return (String) get(TimePropertyEnum.DAY_DATE.toString());
+    }
+
+    public String getDayTime() {
+        return (String) get(TimePropertyEnum.DAY_TIME.toString());
+    }
+
+    public int getTimeHour() {
+        return (int) get(TimePropertyEnum.TIME_HOUR.toString());
+    }
+
+    public int getTimeMinute() {
+        return (int) get(TimePropertyEnum.TIME_MINUTE.toString());
+    }
+
+    public int getTimeSecond() {
+        return (int) get(TimePropertyEnum.TIME_SECOND.toString());
+    }
 }
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
index 0437030..9fefa51 100644
--- a/metrics-reporter-hive/pom.xml
+++ b/metrics-reporter-hive/pom.xml
@@ -56,5 +56,30 @@
             <artifactId>hadoop-hdfs</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <!-- Env & Test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 8bc7a43..e79010c 100644
--- 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -309,7 +309,7 @@ public class HiveProducer {
         fout = null;
     }
 
-    private HiveProducerRecord convertTo(Record record) throws Exception {
+    HiveProducerRecord convertTo(Record record) throws Exception {
         Map<String, Object> rawValue = record.getValueRaw();
 
         //Set partition values for hive table
@@ -330,7 +330,8 @@ public class HiveProducer {
             
columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
         }
 
-        return new HiveProducerRecord(tableNameSplits.getFirst(), 
tableNameSplits.getSecond(), partitionKVs,
-                columnValues);
+        HiveProducerRecord.RecordKey key = new 
HiveProducerRecord.KeyBuilder(tableNameSplits.getSecond())
+                
.setDbName(tableNameSplits.getFirst()).setPartitionKVs(partitionKVs).build();
+        return new HiveProducerRecord(key, columnValues);
     }
 }
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
index 650d18a..fa5222f 100644
--- 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -30,23 +30,8 @@ public class HiveProducerRecord {
     private final RecordKey key;
     private final List<Object> value;
 
-    public HiveProducerRecord(String dbName, String tableName, Map<String, 
String> partitionKVs, List<Object> value) {
-        this.key = new RecordKey(dbName, tableName, partitionKVs);
-        this.value = value;
-    }
-
-    public HiveProducerRecord(String tableName, Map<String, String> 
partitionKVs, List<Object> value) {
-        this.key = new RecordKey(tableName, partitionKVs);
-        this.value = value;
-    }
-
-    public HiveProducerRecord(String dbName, String tableName, List<Object> 
value) {
-        this.key = new RecordKey(dbName, tableName);
-        this.value = value;
-    }
-
-    public HiveProducerRecord(String tableName, List<Object> value) {
-        this.key = new RecordKey(tableName);
+    public HiveProducerRecord(RecordKey key, List<Object> value) {
+        this.key = key;
         this.value = value;
     }
 
@@ -75,41 +60,55 @@ public class HiveProducerRecord {
         return sb.toString();
     }
 
+    @Override
     public boolean equals(Object o) {
-        if (this == o) {
+        if (this == o)
             return true;
-        } else if (!(o instanceof HiveProducerRecord)) {
+        if (o == null || getClass() != o.getClass())
             return false;
-        } else {
-            HiveProducerRecord that = (HiveProducerRecord) o;
-            if (this.key != null) {
-                if (!this.key.equals(that.key)) {
-                    return false;
-                }
-            } else if (that.key != null) {
-                return false;
-            }
-            if (this.value != null) {
-                if (!this.value.equals(that.value)) {
-                    return false;
-                }
-            } else if (that.value != null) {
-                return false;
-            }
-        }
-        return true;
+
+        HiveProducerRecord record = (HiveProducerRecord) o;
+
+        if (key != null ? !key.equals(record.key) : record.key != null)
+            return false;
+        return value != null ? value.equals(record.value) : record.value == 
null;
     }
 
+    @Override
     public int hashCode() {
-        int result = this.key != null ? this.key.hashCode() : 0;
-        result = 31 * result + (this.value != null ? this.value.hashCode() : 
0);
+        int result = key != null ? key.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
         return result;
     }
 
+    public static class KeyBuilder {
+        private final String tableName;
+        private String dbName;
+        private Map<String, String> partitionKVs;
+
+        public KeyBuilder(String tableName) {
+            this.tableName = tableName;
+        }
+
+        public KeyBuilder setDbName(String dbName) {
+            this.dbName = dbName;
+            return this;
+        }
+
+        public KeyBuilder setPartitionKVs(Map<String, String> partitionKVs) {
+            this.partitionKVs = partitionKVs;
+            return this;
+        }
+
+        public RecordKey build() {
+            return new RecordKey(dbName, tableName, partitionKVs);
+        }
+    }
+
     /**
      * Use to organize metrics message
      */
-    public class RecordKey {
+    public static class RecordKey {
         public static final String DEFAULT_DB_NAME = "DEFAULT";
 
         private final String dbName;
@@ -126,18 +125,6 @@ public class HiveProducerRecord {
             this.partitionKVs = partitionKVs;
         }
 
-        public RecordKey(String tableName, Map<String, String> partitionKVs) {
-            this(null, tableName, partitionKVs);
-        }
-
-        public RecordKey(String dbName, String tableName) {
-            this(dbName, tableName, null);
-        }
-
-        public RecordKey(String tableName) {
-            this(null, tableName, null);
-        }
-
         public String database() {
             return this.dbName;
         }
@@ -152,47 +139,31 @@ public class HiveProducerRecord {
 
         public String toString() {
             String partitionKVs = this.partitionKVs == null ? "null" : 
this.partitionKVs.toString();
-            return "RecordKey(database=" + this.dbName + ", table=" + 
this.tableName + ", partition=" + partitionKVs + ")";
+            return "RecordKey(database=" + this.dbName + ", table=" + 
this.tableName + ", partition=" + partitionKVs
+                    + ")";
         }
 
+        @Override
         public boolean equals(Object o) {
-            if (this == o) {
+            if (this == o)
                 return true;
-            } else if (!(o instanceof RecordKey)) {
+            if (o == null || getClass() != o.getClass())
                 return false;
-            } else {
-                RecordKey that = (RecordKey) o;
-                if (this.dbName != null) {
-                    if (!this.dbName.equals(that.dbName)) {
-                        return false;
-                    }
-                } else if (that.dbName != null) {
-                    return false;
-                }
-
-                if (this.tableName != null) {
-                    if (!this.tableName.equals(that.tableName)) {
-                        return false;
-                    }
-                } else if (that.tableName != null) {
-                    return false;
-                }
-
-                if (this.partitionKVs != null) {
-                    if (!this.partitionKVs.equals(that.partitionKVs)) {
-                        return false;
-                    }
-                } else if (that.partitionKVs != null) {
-                    return false;
-                }
-            }
-            return true;
+
+            RecordKey recordKey = (RecordKey) o;
+
+            if (dbName != null ? !dbName.equals(recordKey.dbName) : 
recordKey.dbName != null)
+                return false;
+            if (tableName != null ? !tableName.equals(recordKey.tableName) : 
recordKey.tableName != null)
+                return false;
+            return partitionKVs != null ? 
partitionKVs.equals(recordKey.partitionKVs) : recordKey.partitionKVs == null;
         }
 
+        @Override
         public int hashCode() {
-            int result = this.dbName != null ? this.dbName.hashCode() : 0;
-            result = 31 * result + (this.tableName != null ? 
this.tableName.hashCode() : 0);
-            result = 31 * result + (this.partitionKVs != null ? 
this.partitionKVs.hashCode() : 0);
+            int result = dbName != null ? dbName.hashCode() : 0;
+            result = 31 * result + (tableName != null ? tableName.hashCode() : 
0);
+            result = 31 * result + (partitionKVs != null ? 
partitionKVs.hashCode() : 0);
             return result;
         }
     }
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
index 9d93e99..d1e252f 100644
--- 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -84,6 +84,10 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
         stop();
     }
 
+    HiveReservoirListener getListener() {
+        return listener;
+    }
+
     /**
      * A builder for {@link HiveReservoirReporter} instances.
      */
@@ -107,15 +111,19 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
         }
     }
 
-    private class HiveReservoirListener implements ActiveReservoirListener {
+    class HiveReservoirListener implements ActiveReservoirListener {
         private Properties props;
         private Map<String, HiveProducer> producerMap = new HashMap<>();
 
+        private long nRecord = 0;
+        private long nRecordSkip = 0;
+        private long nUpdate = 0;
+
         private HiveReservoirListener(Properties props) throws Exception {
             this.props = props;
         }
 
-        private synchronized HiveProducer getProducer(String metricType) 
throws Exception {
+        synchronized HiveProducer getProducer(String metricType) throws 
Exception {
             HiveProducer producer = producerMap.get(metricType);
             if (producer == null) {
                 producer = new HiveProducer(metricType, props);
@@ -129,6 +137,7 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
                 return true;
             }
             logger.info("Try to write {} records", records.size());
+            long prevNRecord = nRecord;
             try {
                 Map<String, List<Record>> queues = new HashMap<>();
                 for (Record record : records) {
@@ -142,21 +151,17 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
                 for (Map.Entry<String, List<Record>> entry : 
queues.entrySet()) {
                     HiveProducer producer = getProducer(entry.getKey());
                     producer.send(entry.getValue());
+                    nRecord += entry.getValue().size();
                 }
                 queues.clear();
+                if (nUpdate++ % 100 == 0) {
+                    logger.info("Has done the update {} times with {} records 
reported, {} records skipped", nUpdate,
+                            nRecord, nRecordSkip);
+                }
             } catch (Exception e) {
+                nRecordSkip += records.size() - (nRecord - prevNRecord);
                 logger.error(e.getMessage(), e);
-                return false;
-            }
-            return true;
-        }
-
-        public boolean onRecordUpdate(final Record record) {
-            try {
-                HiveProducer producer = getProducer(record.getSubject());
-                producer.send(record);
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+                logger.warn("Has skipped reporting {} records", nRecordSkip);
                 return false;
             }
             return true;
@@ -168,5 +173,13 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
             }
             producerMap.clear();
         }
+
+        public long getNRecord() {
+            return nRecord;
+        }
+
+        public long getNRecordSkip() {
+            return nRecordSkip;
+        }
     }
 }
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
similarity index 99%
rename from 
tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
rename to 
metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 35d9efb..2f9eb1d 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -19,8 +19,8 @@
 package org.apache.kylin.tool.metrics.systemcube;
 
 import java.util.List;
-
 import java.util.Locale;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
@@ -32,7 +32,6 @@ import org.apache.kylin.metrics.property.JobPropertyEnum;
 import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
 import org.apache.kylin.metrics.property.QueryPropertyEnum;
 import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
diff --git 
a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
 
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
new file mode 100644
index 0000000..ead74ad
--- /dev/null
+++ 
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecordTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static 
org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.DELIMITER;
+import static 
org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey.DEFAULT_DB_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class HiveProducerRecordTest {
+
+    @Test
+    public void testRecord() {
+        String dbName = "KYLIN";
+        String tableName = "test";
+        Map<String, String> partitionKVs = Maps.newHashMap();
+        partitionKVs.put("key1", "value1");
+
+        Set<RecordKey> keySet = Sets.newHashSet();
+        RecordKey key1 = new HiveProducerRecord.KeyBuilder(tableName).build();
+        RecordKey key11 = new 
HiveProducerRecord.KeyBuilder(tableName).setDbName(DEFAULT_DB_NAME).build();
+        keySet.add(key1);
+        keySet.add(key11);
+        assertEquals(1, keySet.size());
+
+        RecordKey key2 = new 
HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).build();
+        RecordKey key3 = new 
HiveProducerRecord.KeyBuilder(tableName).setDbName(dbName).setPartitionKVs(partitionKVs)
+                .build();
+        keySet.add(key2);
+        keySet.add(key3);
+        assertEquals(3, keySet.size());
+        assertEquals(dbName, key2.database());
+        assertEquals(tableName, key2.table());
+
+        List<Object> value1 = Lists.<Object> newArrayList(1);
+        List<Object> value2 = Lists.<Object> newArrayList(1, "1");
+
+        assertNull(new HiveProducerRecord(key1, null).valueToString());
+
+        Set<HiveProducerRecord> recordSet = Sets.newHashSet();
+        HiveProducerRecord record1 = new HiveProducerRecord(key1, value1);
+        HiveProducerRecord record11 = new HiveProducerRecord(key11, value1);
+        recordSet.add(record1);
+        recordSet.add(record11);
+        assertEquals(1, recordSet.size());
+        assertEquals(key1, record1.key());
+        assertEquals(value1, record1.value());
+
+        recordSet.add(new HiveProducerRecord(key1, value2));
+        recordSet.add(new HiveProducerRecord(key2, value1));
+        assertEquals(3, recordSet.size());
+        assertEquals(1, record1.valueToString().split(DELIMITER).length);
+    }
+}
diff --git 
a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
 
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
new file mode 100644
index 0000000..2adc34f
--- /dev/null
+++ 
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.source.hive.HiveMetaStoreClientFactory;
+import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+@PrepareForTest(fullyQualifiedNames = { "org.apache.hadoop.fs.FileSystem",
+        "org.apache.kylin.source.hive.HiveMetaStoreClientFactory",
+        "org.apache.kylin.metrics.lib.impl.hive.HiveProducer$1" })
+public class HiveProducerTest {
+
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    private HiveProducer hiveProducer;
+    private HiveMetaStoreClient metaStoreClient;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/localmeta");
+
+        FileSystem hdfs = PowerMockito.mock(FileSystem.class);
+        URI uri = PowerMockito.mock(URI.class);
+        PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", 
Configuration.class)).toReturn(hdfs);
+        PowerMockito.when(hdfs.getUri()).thenReturn(uri);
+        PowerMockito.when(uri.toString()).thenReturn("hdfs");
+
+        HiveConf hiveConf = PowerMockito.mock(HiveConf.class);
+        String metricsType = new HiveSink()
+                
.getTableFromSubject(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
+
+        hiveProducer = new HiveProducer(metricsType, new Properties(), 
hiveConf);
+
+        metaStoreClient = PowerMockito.mock(HiveMetaStoreClient.class);
+        
PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConf).thenReturn(metaStoreClient);
+        PowerMockito
+                .stub(PowerMockito.method(HiveMetaStoreClientFactory.class, 
"getHiveMetaStoreClient", HiveConf.class))
+                .toReturn(metaStoreClient);
+    }
+
+    @After
+    public void after() throws Exception {
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+    }
+
+    @Test
+    public void testProduce() throws Exception {
+        TimedRecordEvent rpcEvent = generateTestRPCRecord();
+
+        Map<String, String> partitionKVs = Maps.newHashMap();
+        partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), 
rpcEvent.getDayDate());
+
+        List<Object> value = Lists.newArrayList(rpcEvent.getHost(), "default", 
"test_cube", "sandbox", "NULL", 80L, 3L,
+                3L, 0L, 0L, 0L, rpcEvent.getTime(), rpcEvent.getYear(), 
rpcEvent.getMonth(),
+                rpcEvent.getWeekBeginDate(), rpcEvent.getDayTime(), 
rpcEvent.getTimeHour(), rpcEvent.getTimeMinute(),
+                rpcEvent.getTimeSecond(), rpcEvent.getDayDate());
+
+        HiveProducerRecord.RecordKey key = new 
HiveProducerRecord.KeyBuilder("HIVE_metrics_query_rpc_test")
+                .setDbName("KYLIN").setPartitionKVs(partitionKVs).build();
+        HiveProducerRecord target = new HiveProducerRecord(key, value);
+
+        prepareMockForEvent(rpcEvent);
+        assertEquals(target, hiveProducer.convertTo(rpcEvent));
+    }
+
+    private void prepareMockForEvent(RecordEvent event) throws Exception {
+        String tableFullName = new 
HiveSink().getTableFromSubject(event.getEventType());
+        Pair<String, String> tableNameSplits = 
ActiveReservoirReporter.getTableNameSplits(tableFullName);
+        String dbName = tableNameSplits.getFirst();
+        String tableName = tableNameSplits.getSecond();
+
+        Table table = PowerMockito.mock(Table.class);
+        PowerMockito.when(metaStoreClient, "getTable", dbName, 
tableName).thenReturn(table);
+
+        StorageDescriptor sd = PowerMockito.mock(StorageDescriptor.class);
+        PowerMockito.when(table, "getSd").thenReturn(sd);
+        PowerMockito.when(sd, "getLocation").thenReturn(null);
+
+        List<Pair<String, String>> columns = 
HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
+        List<Pair<String, String>> partitions = 
HiveTableCreator.getPartitionKVsForHiveTable();
+        columns.addAll(partitions);
+        List<FieldSchema> fields = 
Lists.newArrayListWithExpectedSize(columns.size());
+        for (Pair<String, String> column : columns) {
+            fields.add(new FieldSchema(column.getFirst(), column.getSecond(), 
null));
+        }
+        PowerMockito.when(metaStoreClient, "getFields", dbName, 
tableName).thenReturn(fields);
+    }
+
+    private TimedRecordEvent generateTestRPCRecord() {
+        TimedRecordEvent rpcMetricsEvent = new 
TimedRecordEvent("metrics_query_rpc_test");
+        setRPCWrapper(rpcMetricsEvent, "default", "test_cube", "sandbox", 
null);
+        setRPCStats(rpcMetricsEvent, 80L, 0L, 3L, 3L, 0L);
+        return rpcMetricsEvent;
+    }
+
+    private static void setRPCWrapper(RecordEvent metricsEvent, String 
projectName, String realizationName,
+            String rpcServer, Throwable throwable) {
+        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), 
realizationName);
+        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), 
rpcServer);
+        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
+                throwable == null ? "NULL" : throwable.getClass().getName());
+    }
+
+    private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, 
long skipCount, long scanCount,
+            long returnCount, long aggrCount) {
+        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), 
callTimeMs);
+        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), 
skipCount); //Number of skips on region servers based on region meta or fuzzy 
filter
+        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), 
scanCount); //Count scanned by region server
+        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), 
returnCount);//Count returned by region server
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), 
scanCount - returnCount); //Count filtered & aggregated by coprocessor
+        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), 
aggrCount); //Count aggregated by coprocessor
+    }
+}
diff --git 
a/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
 
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
new file mode 100644
index 0000000..fbb656c
--- /dev/null
+++ 
b/metrics-reporter-hive/src/test/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporterTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.InstantReservoir;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+import com.google.common.collect.Lists;
+
+@PrepareForTest({ HiveReservoirReporter.HiveReservoirListener.class })
+public class HiveReservoirReporterTest {
+
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    private HiveReservoirReporter hiveReporter;
+    private ActiveReservoir reservoir;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/localmeta");
+
+        HiveProducer hiveProducer = PowerMockito.mock(HiveProducer.class);
+        
PowerMockito.whenNew(HiveProducer.class).withAnyArguments().thenReturn(hiveProducer);
+
+        reservoir = new InstantReservoir();
+        reservoir.start();
+        hiveReporter = HiveReservoirReporter.forRegistry(reservoir).build();
+    }
+
+    @After
+    public void after() throws Exception {
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        String metricsType = "TEST";
+        Record record = new RecordEvent(metricsType);
+        reservoir.update(record);
+        assertEquals(0, hiveReporter.getListener().getNRecord());
+
+        hiveReporter.start();
+        reservoir.update(record);
+        reservoir.update(record);
+        assertEquals(2, hiveReporter.getListener().getNRecord());
+
+        hiveReporter.stop();
+        reservoir.update(record);
+        assertEquals(2, hiveReporter.getListener().getNRecord());
+
+        hiveReporter.start();
+        reservoir.update(record);
+        PowerMockito.doThrow(new 
Exception()).when(hiveReporter.getListener().getProducer(metricsType))
+                .send(Lists.newArrayList(record));
+        reservoir.update(record);
+        assertEquals(3, hiveReporter.getListener().getNRecord());
+        assertEquals(1, hiveReporter.getListener().getNRecordSkip());
+    }
+}
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
index e5f391e..b22e01e 100644
--- a/metrics-reporter-kafka/pom.xml
+++ b/metrics-reporter-kafka/pom.xml
@@ -42,5 +42,24 @@
             <artifactId>kafka_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
index df79c57..b1a1bd1 100644
--- 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
+++ 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -115,4 +115,12 @@ public abstract class KafkaActiveReserviorListener 
implements ActiveReservoirLis
         logger.debug("Cannot find topic {}", topic);
         topicsIfAvailable.put(topic, System.currentTimeMillis());
     }
+
+    public long getNRecord() {
+        return nRecord;
+    }
+
+    public long getNRecordSkip() {
+        return nRecordSkip;
+    }
 }
diff --git 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
index a7b58a6..97b839c 100644
--- 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
+++ 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -88,6 +88,10 @@ public class KafkaReservoirReporter extends 
ActiveReservoirReporter {
         stop();
     }
 
+    KafkaReservoirListener getListener() {
+        return listener;
+    }
+
     /**
      * A builder for {@link KafkaReservoirReporter} instances.
      */
@@ -113,7 +117,7 @@ public class KafkaReservoirReporter extends 
ActiveReservoirReporter {
         }
     }
 
-    private class KafkaReservoirListener extends KafkaActiveReserviorListener {
+    class KafkaReservoirListener extends KafkaActiveReserviorListener {
         protected final Producer<byte[], byte[]> producer;
 
         private KafkaReservoirListener(Properties props) {
diff --git 
a/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
 
b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
new file mode 100644
index 0000000..4a14e66
--- /dev/null
+++ 
b/metrics-reporter-kafka/src/test/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporterTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.InstantReservoir;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+
+@PrepareForTest({ KafkaReservoirReporter.KafkaReservoirListener.class })
+public class KafkaReservoirReporterTest {
+
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    private KafkaReservoirReporter kafkaReporter;
+    private ActiveReservoir reservoir;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(KylinConfig.KYLIN_CONF, 
"../examples/test_case_data/localmeta");
+
+        KafkaProducer kafkaProducer = PowerMockito.mock(KafkaProducer.class);
+        
PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducer);
+
+        reservoir = new InstantReservoir();
+        reservoir.start();
+        kafkaReporter = KafkaReservoirReporter.forRegistry(reservoir).build();
+    }
+
+    @After
+    public void after() throws Exception {
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+    }
+
+    @Test
+    public void testUpdate() {
+        Record record = new RecordEvent("TEST");
+        reservoir.update(record);
+        assertEquals(0, kafkaReporter.getListener().getNRecord());
+
+        kafkaReporter.start();
+        reservoir.update(record);
+        reservoir.update(record);
+        assertEquals(2, kafkaReporter.getListener().getNRecord());
+
+        kafkaReporter.stop();
+        reservoir.update(record);
+        assertEquals(2, kafkaReporter.getListener().getNRecord());
+        assertEquals(0, kafkaReporter.getListener().getNRecordSkip());
+    }
+}

Reply via email to