Repository: flume
Updated Branches:
  refs/heads/trunk 5f6d6c3b1 -> 4d7124700


FLUME-2628. Add an optional parameter to specify the expected input text 
encoding for the netcat source

(Lionel Herbet via Johny Rufus)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d712470
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d712470
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d712470

Branch: refs/heads/trunk
Commit: 4d7124700f678169756df81aa110bd979a4bcb93
Parents: 5f6d6c3
Author: Johny Rufus <[email protected]>
Authored: Mon Jul 13 18:01:10 2015 -0700
Committer: Johny Rufus <[email protected]>
Committed: Fri Jul 17 09:58:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/flume/source/NetcatSource.java   |  13 +-
 .../NetcatSourceConfigurationConstants.java     |   5 +
 .../apache/flume/source/TestNetcatSource.java   | 374 +++++++++++++++++++
 3 files changed, 390 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/4d712470/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
index 2da38bb..61e3f90 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
@@ -113,6 +113,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
   private int port;
   private int maxLineLength;
   private boolean ackEveryEvent;
+  private String sourceEncoding;
 
   private CounterGroup counterGroup;
   private ServerSocketChannel serverSocket;
@@ -142,6 +143,10 @@ public class NetcatSource extends AbstractSource 
implements Configurable,
     maxLineLength = context.getInteger(
         NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH,
         NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH);
+    sourceEncoding = context.getString(
+        NetcatSourceConfigurationConstants.CONFIG_SOURCE_ENCODING,
+        NetcatSourceConfigurationConstants.DEFAULT_ENCODING
+    );
   }
 
   @Override
@@ -176,6 +181,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
     acceptRunnable.ackEveryEvent = ackEveryEvent;
     acceptRunnable.source = this;
     acceptRunnable.serverSocket = serverSocket;
+    acceptRunnable.sourceEncoding = sourceEncoding;
 
     acceptThread = new Thread(acceptRunnable);
 
@@ -251,6 +257,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
     private EventDrivenSource source;
     private AtomicBoolean shouldStop;
     private boolean ackEveryEvent;
+    private String sourceEncoding;
 
     private final int maxLineLength;
 
@@ -272,6 +279,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
           request.counterGroup = counterGroup;
           request.source = source;
           request.ackEveryEvent = ackEveryEvent;
+          request.sourceEncoding = sourceEncoding;
 
           handlerService.submit(request);
 
@@ -294,6 +302,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
     private CounterGroup counterGroup;
     private SocketChannel socketChannel;
     private boolean ackEveryEvent;
+    private String sourceEncoding;
 
     private final int maxLineLength;
 
@@ -307,8 +316,8 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
       Event event = null;
 
       try {
-        Reader reader = Channels.newReader(socketChannel, "utf-8");
-        Writer writer = Channels.newWriter(socketChannel, "utf-8");
+        Reader reader = Channels.newReader(socketChannel, sourceEncoding);
+        Writer writer = Channels.newWriter(socketChannel, sourceEncoding);
         CharBuffer buffer = CharBuffer.allocate(maxLineLength);
         buffer.flip(); // flip() so fill() sees buffer as initially empty
 

http://git-wip-us.apache.org/repos/asf/flume/blob/4d712470/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
index fdf318a..1720d5f 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
@@ -42,4 +42,9 @@ public class NetcatSourceConfigurationConstants {
   public static final String CONFIG_MAX_LINE_LENGTH = "max-line-length";
   public static final int DEFAULT_MAX_LINE_LENGTH = 512;
 
+ /**
+  * Encoding for the netcat source
+  */
+  public static final String CONFIG_SOURCE_ENCODING = "encoding";
+  public static final String DEFAULT_ENCODING = "utf-8";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/4d712470/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
new file mode 100644
index 0000000..e11b4b6
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.flume.*;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.jboss.netty.channel.ChannelException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestNetcatSource {
+  private static final Logger logger = LoggerFactory
+          .getLogger(TestAvroSource.class);
+
+  /**
+   * Five first sentences of the Fables "The Crow and the Fox"
+   * written by Jean de La Fontaine, French poet.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Jean_de_La_Fontaine";>Jean de 
La Fontaine on wikipedia</a>
+   */
+  private final String french = "Maître Corbeau, sur un arbre perché, " +
+          "Tenait en son bec un fromage. " +
+          "Maître Renard, par l'odeur alléché, " +
+          "Lui tint à peu près ce langage : " +
+          "Et bonjour, Monsieur du Corbeau,";
+
+  private final String english = "At the top of a tree perched Master Crow; " +
+          "In his beak he was holding a cheese. " +
+          "Drawn by the smell, Master Fox spoke, below. " +
+          "The words, more or less, were these: " +
+          "\"Hey, now, Sir Crow! Good day, good day!";
+
+  private int selectedPort;
+  private NetcatSource source;
+  private Channel channel;
+  private InetAddress localhost;
+  private Charset defaultCharset = Charset.forName("UTF-8");
+
+  /**
+   * We set up the the Netcat source and Flume Memory Channel on localhost
+   *
+   * @throws UnknownHostException
+   */
+  @Before
+  public void setUp() throws UnknownHostException {
+    localhost = InetAddress.getByName("127.0.0.1");
+    source = new NetcatSource();
+    channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+  }
+
+  /**
+   * Test with UTF-16BE encoding Text with both french and english sentences
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testUTF16BEencoding() throws InterruptedException, IOException {
+    String encoding = "UTF-16BE";
+    startSource(encoding, "false", "1", "512");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    try {
+      // Test on english text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, english, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
english.getBytes(defaultCharset), getFlumeEvent());
+      }
+      // Test on french text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, french, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
french.getBytes(defaultCharset), getFlumeEvent());
+      }
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  /**
+   * Test with UTF-16LE encoding Text with both french and english sentences
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testUTF16LEencoding() throws InterruptedException, IOException {
+    String encoding = "UTF-16LE";
+    startSource(encoding, "false", "1", "512");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    try {
+      // Test on english text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, english, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
english.getBytes(defaultCharset), getFlumeEvent());
+      }
+      // Test on french text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, french, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
french.getBytes(defaultCharset), getFlumeEvent());
+      }
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  /**
+   * Test with UTF-8 encoding Text with both french and english sentences
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testUTF8encoding() throws InterruptedException, IOException {
+    String encoding = "UTF-8";
+    startSource(encoding, "false", "1", "512");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    try {
+      // Test on english text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, english, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
english.getBytes(defaultCharset), getFlumeEvent());
+      }
+      // Test on french text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, french, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
french.getBytes(defaultCharset), getFlumeEvent());
+      }
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  /**
+   * Test with ISO-8859-1 encoding Text with both french and english sentences
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testIS88591encoding() throws InterruptedException, IOException {
+    String encoding = "ISO-8859-1";
+    startSource(encoding, "false", "1", "512");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    try {
+      // Test on english text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, english, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
english.getBytes(defaultCharset), getFlumeEvent());
+      }
+      // Test on french text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, french, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
french.getBytes(defaultCharset), getFlumeEvent());
+      }
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  /**
+   * Test if an ack is sent for every event in the correct encoding
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testAck() throws InterruptedException, IOException {
+    String encoding = "UTF-8";
+    String ackEvent = "OK";
+    startSource(encoding, "true", "1", "512");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    LineIterator inputLineIterator = 
IOUtils.lineIterator(netcatSocket.getInputStream(), encoding);
+    try {
+      // Test on english text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, english, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
english.getBytes(defaultCharset), getFlumeEvent());
+        Assert.assertEquals("Socket contained the Ack", ackEvent, 
inputLineIterator.nextLine());
+      }
+      // Test on french text snippet
+      for (int i = 0; i < 20; i++) {
+        sendEvent(netcatSocket, french, encoding);
+        Assert.assertArrayEquals("Channel contained our event", 
french.getBytes(defaultCharset), getFlumeEvent());
+        Assert.assertEquals("Socket contained the Ack", ackEvent, 
inputLineIterator.nextLine());
+      }
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  /**
+   * Test that line above MaxLineLength are discarded
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testMaxLineLength() throws InterruptedException, IOException {
+    String encoding = "UTF-8";
+    startSource(encoding, "false", "1", "10");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    try {
+      sendEvent(netcatSocket, "123456789", encoding);
+      Assert.assertArrayEquals("Channel contained our event", 
"123456789".getBytes(defaultCharset), getFlumeEvent());
+      sendEvent(netcatSocket, english, encoding);
+      Assert.assertEquals("Channel does not contain an event", null, 
getRawFlumeEvent());
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  /**
+   * Test that line above MaxLineLength are discarded
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test
+  public void testMaxLineLengthwithAck() throws InterruptedException, 
IOException {
+    String encoding = "UTF-8";
+    String ackEvent = "OK";
+    String ackErrorEvent = "FAILED: Event exceeds the maximum length (10 
chars, including newline)";
+    startSource(encoding, "true", "1", "10");
+    Socket netcatSocket = new Socket(localhost, selectedPort);
+    LineIterator inputLineIterator = 
IOUtils.lineIterator(netcatSocket.getInputStream(), encoding);
+    try {
+      sendEvent(netcatSocket, "123456789", encoding);
+      Assert.assertArrayEquals("Channel contained our event", 
"123456789".getBytes(defaultCharset), getFlumeEvent());
+      Assert.assertEquals("Socket contained the Ack", ackEvent, 
inputLineIterator.nextLine());
+      sendEvent(netcatSocket, english, encoding);
+      Assert.assertEquals("Channel does not contain an event", null, 
getRawFlumeEvent());
+      Assert.assertEquals("Socket contained the Error Ack", ackErrorEvent, 
inputLineIterator.nextLine());
+    } finally {
+      netcatSocket.close();
+      stopSource();
+    }
+  }
+
+  private void startSource(String encoding, String ack, String batchSize, 
String maxLineLength) throws InterruptedException {
+    boolean bound = false;
+
+    for (int i = 0; i < 100 && !bound; i++) {
+      try {
+        Context context = new Context();
+        context.put("port", String.valueOf(selectedPort = 10500 + i));
+        context.put("bind", "0.0.0.0");
+        context.put("ack-every-event", ack);
+        context.put("encoding", encoding);
+        context.put("batch-size", batchSize);
+        context.put("max-line-length", maxLineLength);
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        /*
+         * NB: This assume we're using the Netty server under the hood and the
+         * failure is to bind. Yucky.
+         */
+      }
+    }
+
+    Assert.assertTrue("Reached start or error",
+            LifecycleController.waitForOneOf(source, 
LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+            source.getLifecycleState());
+  }
+
+  private void sendEvent(Socket socket, String content, String encoding) 
throws IOException {
+    OutputStream output = socket.getOutputStream();
+    IOUtils.write(content + IOUtils.LINE_SEPARATOR_UNIX, output, encoding);
+    output.flush();
+  }
+
+  private byte[] getFlumeEvent() {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+
+    try {
+      transaction.commit();
+    } catch (Throwable t) {
+      transaction.rollback();
+    } finally {
+      transaction.close();
+    }
+
+    logger.debug("Round trip event:{}", event);
+
+    return event.getBody();
+  }
+
+  private Event getRawFlumeEvent() {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+
+    try {
+      transaction.commit();
+    } catch (Throwable t) {
+      transaction.rollback();
+    } finally {
+      transaction.close();
+    }
+
+    logger.debug("Round trip event:{}", event);
+
+    return event;
+  }
+
+  private void stopSource() throws InterruptedException {
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+            LifecycleController.waitForOneOf(source, 
LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+            source.getLifecycleState());
+    logger.info("Source stopped");
+  }
+}

Reply via email to