Repository: flume Updated Branches: refs/heads/flume-1.5 bf89b590e -> 042f53a5f
http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java new file mode 100644 index 0000000..bef2ac6 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import com.google.common.collect.Maps; +import org.apache.flume.event.SimpleEvent; +import org.joda.time.DateTimeUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TimestampedEventTest { + static final long FIXED_TIME_MILLIS = 123456789L; + + @Before + public void setFixedJodaTime() { + DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS); + } + + @Test + public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp()); + assertEquals(String.valueOf(FIXED_TIME_MILLIS), + timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map<String, String> headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("timestamp", "-321"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-321L, timestampedEvent.getTimestamp()); + assertEquals("-321", timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map<String, String> headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("@timestamp", "-999"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-999L, timestampedEvent.getTimestamp()); + assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp")); + assertNull(timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + base.setBody(new byte[] {1,2,3,4}); + Map<String, String> headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("foo", "bar"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals("bar", timestampedEvent.getHeaders().get("foo")); + assertArrayEquals(base.getBody(), timestampedEvent.getBody()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java new file mode 100644 index 0000000..38e7399 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014 Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.elasticsearch.client; + +import java.util.Arrays; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.Test; + +public class RoundRobinListTest { + + private RoundRobinList<String> fixture; + + @Before + public void setUp() { + fixture = new RoundRobinList<String>(Arrays.asList("test1", "test2")); + } + + @Test + public void shouldReturnNextElement() { + assertEquals("test1", fixture.get()); + assertEquals("test2", fixture.get()); + assertEquals("test1", fixture.get()); + assertEquals("test2", fixture.get()); + assertEquals("test1", fixture.get()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java new file mode 100644 index 0000000..4b70b65 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; +import org.mockito.Mock; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestElasticSearchClientFactory { + + ElasticSearchClientFactory factory; + + @Mock + ElasticSearchEventSerializer serializer; + + @Before + public void setUp() { + initMocks(this); + factory = new ElasticSearchClientFactory(); + } + + @Test + public void shouldReturnTransportClient() throws Exception { + String[] hostNames = { "127.0.0.1" }; + Object o = factory.getClient(ElasticSearchClientFactory.TransportClient, + hostNames, "test", serializer, null); + assertThat(o, instanceOf(ElasticSearchTransportClient.class)); + } + + @Test + public void shouldReturnRestClient() throws NoSuchClientTypeException { + String[] hostNames = { "127.0.0.1" }; + Object o = factory.getClient(ElasticSearchClientFactory.RestClient, + hostNames, "test", serializer, null); + assertThat(o, instanceOf(ElasticSearchRestClient.class)); + } + + @Test(expected=NoSuchClientTypeException.class) + public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException { + String[] hostNames = {"127.0.0.1"}; + factory.getClient("not_existing_client", hostNames, "test", null, null); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java new file mode 100644 index 0000000..b7d8822 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.BytesStream; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + +import java.io.IOException; +import java.util.List; + +import static junit.framework.Assert.assertEquals; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.common.bytes.BytesArray; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestElasticSearchRestClient { + + private ElasticSearchRestClient fixture; + + @Mock + private ElasticSearchEventSerializer serializer; + + @Mock + private IndexNameBuilder nameBuilder; + + @Mock + private Event event; + + @Mock + private HttpClient httpClient; + + @Mock + private HttpResponse httpResponse; + + @Mock + private StatusLine httpStatus; + + @Mock + private HttpEntity httpEntity; + + private static final String INDEX_NAME = "foo_index"; + private static final String MESSAGE_CONTENT = "{\"body\":\"test\"}"; + private static final String[] HOSTS = {"host1", "host2"}; + + @Before + public void setUp() throws IOException { + initMocks(this); + BytesReference bytesReference = mock(BytesReference.class); + BytesStream bytesStream = mock(BytesStream.class); + + when(nameBuilder.getIndexName(any(Event.class))).thenReturn(INDEX_NAME); + when(bytesReference.toBytesArray()).thenReturn(new BytesArray(MESSAGE_CONTENT)); + when(bytesStream.bytes()).thenReturn(bytesReference); + when(serializer.getContentBuilder(any(Event.class))).thenReturn(bytesStream); + fixture = new ElasticSearchRestClient(HOSTS, serializer, httpClient); + } + + @Test + public void shouldAddNewEventWithoutTTL() throws Exception { + ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", -1); + fixture.execute(); + + verify(httpClient).execute(isA(HttpUriRequest.class)); + verify(httpClient).execute(argument.capture()); + + assertEquals("http://host1/_bulk", argument.getValue().getURI().toString()); + assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\"}}\n" + MESSAGE_CONTENT + "\n", + EntityUtils.toString(argument.getValue().getEntity())); + } + + @Test + public void shouldAddNewEventWithTTL() throws Exception { + ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", 123); + fixture.execute(); + + verify(httpClient).execute(isA(HttpUriRequest.class)); + verify(httpClient).execute(argument.capture()); + + assertEquals("http://host1/_bulk", argument.getValue().getURI().toString()); + assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n" + + MESSAGE_CONTENT + "\n", EntityUtils.toString(argument.getValue().getEntity())); + } + + @Test(expected = EventDeliveryException.class) + public void shouldThrowEventDeliveryException() throws Exception { + ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", 123); + fixture.execute(); + } + + @Test() + public void shouldRetryBulkOperation() throws Exception { + ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", 123); + fixture.execute(); + + verify(httpClient, times(2)).execute(isA(HttpUriRequest.class)); + verify(httpClient, times(2)).execute(argument.capture()); + + List<HttpPost> allValues = argument.getAllValues(); + assertEquals("http://host1/_bulk", allValues.get(0).getURI().toString()); + assertEquals("http://host2/_bulk", allValues.get(1).getURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/042f53a5/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java new file mode 100644 index 0000000..b7b8e74 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.BytesStream; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestElasticSearchTransportClient { + + private ElasticSearchTransportClient fixture; + + @Mock + private ElasticSearchEventSerializer serializer; + + @Mock + private IndexNameBuilder nameBuilder; + + @Mock + private Client elasticSearchClient; + + @Mock + private BulkRequestBuilder bulkRequestBuilder; + + @Mock + private IndexRequestBuilder indexRequestBuilder; + + @Mock + private Event event; + + @Before + public void setUp() throws IOException { + initMocks(this); + BytesReference bytesReference = mock(BytesReference.class); + BytesStream bytesStream = mock(BytesStream.class); + + when(nameBuilder.getIndexName(any(Event.class))).thenReturn("foo_index"); + when(bytesReference.toBytes()).thenReturn("{\"body\":\"test\"}".getBytes()); + when(bytesStream.bytes()).thenReturn(bytesReference); + when(serializer.getContentBuilder(any(Event.class))) + .thenReturn(bytesStream); + when(elasticSearchClient.prepareIndex(anyString(), anyString())) + .thenReturn(indexRequestBuilder); + when(indexRequestBuilder.setSource(bytesReference)).thenReturn( + indexRequestBuilder); + + fixture = new ElasticSearchTransportClient(elasticSearchClient, serializer); + fixture.setBulkRequestBuilder(bulkRequestBuilder); + } + + @Test + public void shouldAddNewEventWithoutTTL() throws Exception { + fixture.addEvent(event, nameBuilder, "bar_type", -1); + verify(indexRequestBuilder).setSource( + serializer.getContentBuilder(event).bytes()); + verify(bulkRequestBuilder).add(indexRequestBuilder); + } + + @Test + public void shouldAddNewEventWithTTL() throws Exception { + fixture.addEvent(event, nameBuilder, "bar_type", 10); + verify(indexRequestBuilder).setTTL(10); + verify(indexRequestBuilder).setSource( + serializer.getContentBuilder(event).bytes()); + } + + @Test + public void shouldExecuteBulkRequestBuilder() throws Exception { + ListenableActionFuture<BulkResponse> action = + (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class); + BulkResponse response = mock(BulkResponse.class); + when(bulkRequestBuilder.execute()).thenReturn(action); + when(action.actionGet()).thenReturn(response); + when(response.hasFailures()).thenReturn(false); + + fixture.addEvent(event, nameBuilder, "bar_type", 10); + fixture.execute(); + verify(bulkRequestBuilder).execute(); + } + + @Test(expected = EventDeliveryException.class) + public void shouldThrowExceptionOnExecuteFailed() throws Exception { + ListenableActionFuture<BulkResponse> action = + (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class); + BulkResponse response = mock(BulkResponse.class); + when(bulkRequestBuilder.execute()).thenReturn(action); + when(action.actionGet()).thenReturn(response); + when(response.hasFailures()).thenReturn(true); + + fixture.addEvent(event, nameBuilder, "bar_type", 10); + fixture.execute(); + } +}