Repository: flume Updated Branches: refs/heads/trunk 5f6d6c3b1 -> 4d7124700
FLUME-2628. Add an optional parameter to specify the expected input text encoding for the netcat source (Lionel Herbet via Johny Rufus) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d712470 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d712470 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d712470 Branch: refs/heads/trunk Commit: 4d7124700f678169756df81aa110bd979a4bcb93 Parents: 5f6d6c3 Author: Johny Rufus <[email protected]> Authored: Mon Jul 13 18:01:10 2015 -0700 Committer: Johny Rufus <[email protected]> Committed: Fri Jul 17 09:58:22 2015 -0700 ---------------------------------------------------------------------- .../org/apache/flume/source/NetcatSource.java | 13 +- .../NetcatSourceConfigurationConstants.java | 5 + .../apache/flume/source/TestNetcatSource.java | 374 +++++++++++++++++++ 3 files changed, 390 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4d712470/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java index 2da38bb..61e3f90 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java @@ -113,6 +113,7 @@ public class NetcatSource extends AbstractSource implements Configurable, private int port; private int maxLineLength; private boolean ackEveryEvent; + private String sourceEncoding; private CounterGroup counterGroup; private ServerSocketChannel serverSocket; @@ -142,6 +143,10 @@ public class NetcatSource extends AbstractSource implements Configurable, maxLineLength = context.getInteger( NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH, NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH); + sourceEncoding = context.getString( + NetcatSourceConfigurationConstants.CONFIG_SOURCE_ENCODING, + NetcatSourceConfigurationConstants.DEFAULT_ENCODING + ); } @Override @@ -176,6 +181,7 @@ public class NetcatSource extends AbstractSource implements Configurable, acceptRunnable.ackEveryEvent = ackEveryEvent; acceptRunnable.source = this; acceptRunnable.serverSocket = serverSocket; + acceptRunnable.sourceEncoding = sourceEncoding; acceptThread = new Thread(acceptRunnable); @@ -251,6 +257,7 @@ public class NetcatSource extends AbstractSource implements Configurable, private EventDrivenSource source; private AtomicBoolean shouldStop; private boolean ackEveryEvent; + private String sourceEncoding; private final int maxLineLength; @@ -272,6 +279,7 @@ public class NetcatSource extends AbstractSource implements Configurable, request.counterGroup = counterGroup; request.source = source; request.ackEveryEvent = ackEveryEvent; + request.sourceEncoding = sourceEncoding; handlerService.submit(request); @@ -294,6 +302,7 @@ public class NetcatSource extends AbstractSource implements Configurable, private CounterGroup counterGroup; private SocketChannel socketChannel; private boolean ackEveryEvent; + private String sourceEncoding; private final int maxLineLength; @@ -307,8 +316,8 @@ public class NetcatSource extends AbstractSource implements Configurable, Event event = null; try { - Reader reader = Channels.newReader(socketChannel, "utf-8"); - Writer writer = Channels.newWriter(socketChannel, "utf-8"); + Reader reader = Channels.newReader(socketChannel, sourceEncoding); + Writer writer = Channels.newWriter(socketChannel, sourceEncoding); CharBuffer buffer = CharBuffer.allocate(maxLineLength); buffer.flip(); // flip() so fill() sees buffer as initially empty http://git-wip-us.apache.org/repos/asf/flume/blob/4d712470/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java index fdf318a..1720d5f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java @@ -42,4 +42,9 @@ public class NetcatSourceConfigurationConstants { public static final String CONFIG_MAX_LINE_LENGTH = "max-line-length"; public static final int DEFAULT_MAX_LINE_LENGTH = 512; + /** + * Encoding for the netcat source + */ + public static final String CONFIG_SOURCE_ENCODING = "encoding"; + public static final String DEFAULT_ENCODING = "utf-8"; } http://git-wip-us.apache.org/repos/asf/flume/blob/4d712470/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java new file mode 100644 index 0000000..e11b4b6 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.flume.*; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.apache.flume.lifecycle.LifecycleController; +import org.apache.flume.lifecycle.LifecycleState; +import org.jboss.netty.channel.ChannelException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +public class TestNetcatSource { + private static final Logger logger = LoggerFactory + .getLogger(TestAvroSource.class); + + /** + * Five first sentences of the Fables "The Crow and the Fox" + * written by Jean de La Fontaine, French poet. + * + * @see <a href="http://en.wikipedia.org/wiki/Jean_de_La_Fontaine">Jean de La Fontaine on wikipedia</a> + */ + private final String french = "Maître Corbeau, sur un arbre perché, " + + "Tenait en son bec un fromage. " + + "Maître Renard, par l'odeur alléché, " + + "Lui tint à peu près ce langage : " + + "Et bonjour, Monsieur du Corbeau,"; + + private final String english = "At the top of a tree perched Master Crow; " + + "In his beak he was holding a cheese. " + + "Drawn by the smell, Master Fox spoke, below. " + + "The words, more or less, were these: " + + "\"Hey, now, Sir Crow! Good day, good day!"; + + private int selectedPort; + private NetcatSource source; + private Channel channel; + private InetAddress localhost; + private Charset defaultCharset = Charset.forName("UTF-8"); + + /** + * We set up the the Netcat source and Flume Memory Channel on localhost + * + * @throws UnknownHostException + */ + @Before + public void setUp() throws UnknownHostException { + localhost = InetAddress.getByName("127.0.0.1"); + source = new NetcatSource(); + channel = new MemoryChannel(); + + Configurables.configure(channel, new Context()); + + List<Channel> channels = new ArrayList<Channel>(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + } + + /** + * Test with UTF-16BE encoding Text with both french and english sentences + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testUTF16BEencoding() throws InterruptedException, IOException { + String encoding = "UTF-16BE"; + startSource(encoding, "false", "1", "512"); + Socket netcatSocket = new Socket(localhost, selectedPort); + try { + // Test on english text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, english, encoding); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + } + // Test on french text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, french, encoding); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + } + } finally { + netcatSocket.close(); + stopSource(); + } + } + + /** + * Test with UTF-16LE encoding Text with both french and english sentences + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testUTF16LEencoding() throws InterruptedException, IOException { + String encoding = "UTF-16LE"; + startSource(encoding, "false", "1", "512"); + Socket netcatSocket = new Socket(localhost, selectedPort); + try { + // Test on english text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, english, encoding); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + } + // Test on french text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, french, encoding); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + } + } finally { + netcatSocket.close(); + stopSource(); + } + } + + /** + * Test with UTF-8 encoding Text with both french and english sentences + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testUTF8encoding() throws InterruptedException, IOException { + String encoding = "UTF-8"; + startSource(encoding, "false", "1", "512"); + Socket netcatSocket = new Socket(localhost, selectedPort); + try { + // Test on english text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, english, encoding); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + } + // Test on french text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, french, encoding); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + } + } finally { + netcatSocket.close(); + stopSource(); + } + } + + /** + * Test with ISO-8859-1 encoding Text with both french and english sentences + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testIS88591encoding() throws InterruptedException, IOException { + String encoding = "ISO-8859-1"; + startSource(encoding, "false", "1", "512"); + Socket netcatSocket = new Socket(localhost, selectedPort); + try { + // Test on english text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, english, encoding); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + } + // Test on french text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, french, encoding); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + } + } finally { + netcatSocket.close(); + stopSource(); + } + } + + /** + * Test if an ack is sent for every event in the correct encoding + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testAck() throws InterruptedException, IOException { + String encoding = "UTF-8"; + String ackEvent = "OK"; + startSource(encoding, "true", "1", "512"); + Socket netcatSocket = new Socket(localhost, selectedPort); + LineIterator inputLineIterator = IOUtils.lineIterator(netcatSocket.getInputStream(), encoding); + try { + // Test on english text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, english, encoding); + Assert.assertArrayEquals("Channel contained our event", english.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertEquals("Socket contained the Ack", ackEvent, inputLineIterator.nextLine()); + } + // Test on french text snippet + for (int i = 0; i < 20; i++) { + sendEvent(netcatSocket, french, encoding); + Assert.assertArrayEquals("Channel contained our event", french.getBytes(defaultCharset), getFlumeEvent()); + Assert.assertEquals("Socket contained the Ack", ackEvent, inputLineIterator.nextLine()); + } + } finally { + netcatSocket.close(); + stopSource(); + } + } + + /** + * Test that line above MaxLineLength are discarded + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testMaxLineLength() throws InterruptedException, IOException { + String encoding = "UTF-8"; + startSource(encoding, "false", "1", "10"); + Socket netcatSocket = new Socket(localhost, selectedPort); + try { + sendEvent(netcatSocket, "123456789", encoding); + Assert.assertArrayEquals("Channel contained our event", "123456789".getBytes(defaultCharset), getFlumeEvent()); + sendEvent(netcatSocket, english, encoding); + Assert.assertEquals("Channel does not contain an event", null, getRawFlumeEvent()); + } finally { + netcatSocket.close(); + stopSource(); + } + } + + /** + * Test that line above MaxLineLength are discarded + * + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testMaxLineLengthwithAck() throws InterruptedException, IOException { + String encoding = "UTF-8"; + String ackEvent = "OK"; + String ackErrorEvent = "FAILED: Event exceeds the maximum length (10 chars, including newline)"; + startSource(encoding, "true", "1", "10"); + Socket netcatSocket = new Socket(localhost, selectedPort); + LineIterator inputLineIterator = IOUtils.lineIterator(netcatSocket.getInputStream(), encoding); + try { + sendEvent(netcatSocket, "123456789", encoding); + Assert.assertArrayEquals("Channel contained our event", "123456789".getBytes(defaultCharset), getFlumeEvent()); + Assert.assertEquals("Socket contained the Ack", ackEvent, inputLineIterator.nextLine()); + sendEvent(netcatSocket, english, encoding); + Assert.assertEquals("Channel does not contain an event", null, getRawFlumeEvent()); + Assert.assertEquals("Socket contained the Error Ack", ackErrorEvent, inputLineIterator.nextLine()); + } finally { + netcatSocket.close(); + stopSource(); + } + } + + private void startSource(String encoding, String ack, String batchSize, String maxLineLength) throws InterruptedException { + boolean bound = false; + + for (int i = 0; i < 100 && !bound; i++) { + try { + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = 10500 + i)); + context.put("bind", "0.0.0.0"); + context.put("ack-every-event", ack); + context.put("encoding", encoding); + context.put("batch-size", batchSize); + context.put("max-line-length", maxLineLength); + + Configurables.configure(source, context); + + source.start(); + bound = true; + } catch (ChannelException e) { + /* + * NB: This assume we're using the Netty server under the hood and the + * failure is to bind. Yucky. + */ + } + } + + Assert.assertTrue("Reached start or error", + LifecycleController.waitForOneOf(source, LifecycleState.START_OR_ERROR)); + Assert.assertEquals("Server is started", LifecycleState.START, + source.getLifecycleState()); + } + + private void sendEvent(Socket socket, String content, String encoding) throws IOException { + OutputStream output = socket.getOutputStream(); + IOUtils.write(content + IOUtils.LINE_SEPARATOR_UNIX, output, encoding); + output.flush(); + } + + private byte[] getFlumeEvent() { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + Assert.assertNotNull(event); + + try { + transaction.commit(); + } catch (Throwable t) { + transaction.rollback(); + } finally { + transaction.close(); + } + + logger.debug("Round trip event:{}", event); + + return event.getBody(); + } + + private Event getRawFlumeEvent() { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + Event event = channel.take(); + + try { + transaction.commit(); + } catch (Throwable t) { + transaction.rollback(); + } finally { + transaction.close(); + } + + logger.debug("Round trip event:{}", event); + + return event; + } + + private void stopSource() throws InterruptedException { + source.stop(); + Assert.assertTrue("Reached stop or error", + LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); + Assert.assertEquals("Server is stopped", LifecycleState.STOP, + source.getLifecycleState()); + logger.info("Source stopped"); + } +}
