Repository: ambari
Updated Branches:
  refs/heads/trunk a55c055a8 -> e5c13a3ad
diff --git 
new file mode 100644
index 0000000..a7db3f8
--- /dev/null
@@ -0,0 +1,128 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputKafka.KafkaCallBack;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+public class OutputKafkaTest {
+  private static final Logger LOG = Logger.getLogger(OutputKafkaTest.class);
+  private static final String TEST_TOPIC = "test topic";
+  private OutputKafka outputKafka;
+  @SuppressWarnings("unchecked")
+  private KafkaProducer<String, String> mockKafkaProducer = 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+  @Before
+  public void init() {
+    outputKafka = new OutputKafka() {
+      @Override
+      protected KafkaProducer<String, String> creteKafkaProducer(Properties 
props) {
+        return mockKafkaProducer;
+      }
+    };
+  }
+  @Test
+  public void testOutputKafka_uploadData() throws Exception {
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("broker_list", "some broker list");
+    config.put("topic", TEST_TOPIC);
+    outputKafka.loadConfig(config);
+    outputKafka.init();
+    @SuppressWarnings("unchecked")
+    Future<RecordMetadata> mockFuture = EasyMock.mock(Future.class);
+    EasyMock.expect(mockKafkaProducer.send(new ProducerRecord<String, 
String>(TEST_TOPIC, "value0")))
+        .andReturn(mockFuture);
+    EasyMock.expect(mockFuture.get()).andReturn(null);
+    for (int i = 1; i < 10; i++)
+      EasyMock.expect(mockKafkaProducer.send(EasyMock.eq(new 
ProducerRecord<String, String>(TEST_TOPIC, "value" + i)),
+          EasyMock.anyObject(KafkaCallBack.class))).andReturn(null);
+    EasyMock.replay(mockKafkaProducer);
+    for (int i = 0; i < 10; i++) {
+      InputMarker inputMarker = new InputMarker();
+      inputMarker.input = EasyMock.mock(Input.class);
+      outputKafka.write("value" + i, inputMarker);
+    }
+    EasyMock.verify(mockKafkaProducer);
+  }
+  @Test
+  public void testOutputKafka_noBrokerList() throws Exception {
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("For kafka output, bootstrap broker_list 
is needed");
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("topic", TEST_TOPIC);
+    outputKafka.loadConfig(config);
+    outputKafka.init();
+  }
+  @Test
+  public void testOutputKafka_noTopic() throws Exception {
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("For kafka output, topic is needed");
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("broker_list", "some broker list");
+    outputKafka.loadConfig(config);
+    outputKafka.init();
+  }
+  @After
+  public void cleanUp() {
+    EasyMock.reset(mockKafkaProducer);
+  }
diff --git 
new file mode 100644
index 0000000..afbccca
--- /dev/null
@@ -0,0 +1,165 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+public class OutputSolrTest {
+  private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
+  private OutputSolr outputSolr;
+  private Map<Integer, SolrInputDocument> receivedDocs = new 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+  @Before
+  public void init() throws Exception {
+    outputSolr = new OutputSolr() {
+      @Override
+      SolrClient getSolrClient(String solrUrl, String zkHosts, int count) 
throws Exception, MalformedURLException {
+        return new CloudSolrClient(null) {
+          private static final long serialVersionUID = 1L;
+          @Override
+          public UpdateResponse add(Collection<SolrInputDocument> docs) {
+            for (SolrInputDocument doc : docs) {
+              receivedDocs.put((Integer) doc.getField("id").getValue(), doc);
+            }
+            UpdateResponse response = new UpdateResponse();
+            response.setResponse(new NamedList<Object>());
+            return response;
+          }
+        };
+      }
+    };
+  }
+  @Test
+  public void testOutputToSolr_uploadData() throws Exception {
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("url", "some url");
+    config.put("workers", "3");
+    outputSolr.loadConfig(config);
+    outputSolr.init();
+    Map<Integer, SolrInputDocument> expectedDocs = new HashMap<>();
+    int count = 0;
+    for (int i = 0; i < 10; i++) {
+      Map<String, Object> jsonObj = new HashMap<>();
+      for (int j = 0; j < 3; j++)
+        jsonObj.put("name" + ++count, "value" + ++count);
+      jsonObj.put("id", ++count);
+      InputMarker inputMarker = new InputMarker();
+      inputMarker.input = EasyMock.mock(Input.class);
+      outputSolr.write(jsonObj, inputMarker);
+      SolrInputDocument doc = new SolrInputDocument();
+      for (Map.Entry<String, Object> e : jsonObj.entrySet())
+        doc.addField(e.getKey(), e.getValue());
+      expectedDocs.put(count, doc);
+    }
+    Thread.sleep(100);
+    while (outputSolr.getPendingCount() > 0)
+      Thread.sleep(100);
+    int waitToFinish = 0;
+    if (receivedDocs.size() < 10 && waitToFinish < 10) {
+      Thread.sleep(100);
+      waitToFinish++;
+    }
+    Set<Integer> ids = new HashSet<>();
+    ids.addAll(receivedDocs.keySet());
+    ids.addAll(expectedDocs.keySet());
+    for (Integer id : ids) {
+      SolrInputDocument receivedDoc = receivedDocs.get(id);
+      SolrInputDocument expectedDoc = expectedDocs.get(id);
+      assertNotNull("No document received for id: " + id, receivedDoc);
+      assertNotNull("No document expected for id: " + id, expectedDoc);
+      Set<String> fieldNames = new HashSet<>();
+      fieldNames.addAll(receivedDoc.getFieldNames());
+      fieldNames.addAll(expectedDoc.getFieldNames());
+      for (String fieldName : fieldNames) {
+        Object receivedValue = receivedDoc.getFieldValue(fieldName);
+        Object expectedValue = expectedDoc.getFieldValue(fieldName);
+        assertNotNull("No received document field found for id: " + id + ", 
fieldName: " + fieldName, receivedValue);
+        assertNotNull("No expected document field found for id: " + id + ", 
fieldName: " + fieldName, expectedValue);
+        assertEquals("Field value not matching for id: " + id + ", fieldName: 
" + fieldName, receivedValue,
+            expectedValue);
+      }
+    }
+  }
+  @Test
+  public void testOutputToSolr_noUrlOrZKHost() throws Exception {
+    expectedException.expect(Exception.class);
+    expectedException.expectMessage("For solr output, either url or zk_hosts 
property need to be set");
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("workers", "3");
+    outputSolr.loadConfig(config);
+    outputSolr.init();
+  }
+  @After
+  public void cleanUp() {
+    receivedDocs.clear();
+  }
diff --git 
new file mode 100644
index 0000000..e641018
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  See the License for the specific language governing permissions and
+  limitations under the License.
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="";>
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - 
%m%n" />
+      <!-- <param name="ConversionPattern" value="%d [%t] %-5p %c %x - %m%n"/> 
+    </layout>
+  </appender>
+  <!-- Logs to suppress BEGIN -->
+  <category name="" 
+    <priority value="error" />
+    <appender-ref ref="console" />
+  </category>
+  <category name="apache.solr.client.solrj.impl.CloudSolrClient" 
+    <priority value="fatal" />
+    <appender-ref ref="console" />
+  </category>
+  <!-- Logs to suppress END -->
+  <category name="org.apache.ambari.logfeeder" additivity="false">
+    <priority value="info" />
+    <appender-ref ref="console" /> 
+    <!-- <appender-ref ref="daily_rolling_file" /> -->
+  </category>
+  <root>
+    <priority value="warn" />
+    <!-- <appender-ref ref="console" /> -->
+    <!-- <appender-ref ref="daily_rolling_file" /> -->
+  </root>

Reply via email to