Repository: flume Updated Branches: refs/heads/trunk 1003d1f41 -> 34e9bda31
FLUME-2373. Support TBinaryProtocol in Thrift RPC. (Stefan Krawczyk via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/34e9bda3 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/34e9bda3 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/34e9bda3 Branch: refs/heads/trunk Commit: 34e9bda312506a118fad87fcbdecc48bf3918c95 Parents: 1003d1f Author: Hari Shreedharan <[email protected]> Authored: Thu Dec 18 12:13:19 2014 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Dec 18 12:13:19 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/flume/sink/ThriftSink.java | 4 +-- .../org/apache/flume/source/ThriftSource.java | 29 ++++++++++++++++-- .../org/apache/flume/sink/TestThriftSink.java | 11 ++++--- .../org/apache/flume/api/ThriftRpcClient.java | 32 ++++++++++++++++++-- .../apache/flume/api/TestThriftRpcClient.java | 13 ++++---- .../apache/flume/api/ThriftTestingSource.java | 15 +++++++-- 6 files changed, 85 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java index 48a9775..baa60d0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java @@ -18,11 +18,11 @@ */ package org.apache.flume.sink; +import java.util.Properties; + import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; - -import java.util.Properties; /** * <p> * A {@link org.apache.flume.Sink} implementation that can send events to an RPC server (such as http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index c3881b4..551fe13 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -34,6 +34,7 @@ import org.apache.flume.thrift.ThriftSourceProtocol; import org.apache.flume.thrift.ThriftFlumeEvent; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFastFramedTransport; @@ -70,6 +71,13 @@ public class ThriftSource extends AbstractSource implements Configurable, * Config param for the port to listen on. */ public static final String CONFIG_PORT = "port"; + /** + * Config param for the thrift protocol to use. + */ + public static final String CONFIG_PROTOCOL = "protocol"; + public static final String BINARY_PROTOCOL = "binary"; + public static final String COMPACT_PROTOCOL = "compact"; + private Integer port; private String bindAddress; private int maxThreads = 0; @@ -77,6 +85,7 @@ public class ThriftSource extends AbstractSource implements Configurable, private TServer server; private TServerTransport serverTransport; private ExecutorService servingExecutor; + private String protocol; @Override public void configure(Context context) { @@ -98,6 +107,17 @@ public class ThriftSource extends AbstractSource implements Configurable, if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } + + protocol = context.getString(CONFIG_PROTOCOL); + if (protocol == null) { + // default is to use the compact protocol. + protocol = COMPACT_PROTOCOL; + } + Preconditions.checkArgument( + (protocol.equalsIgnoreCase(BINARY_PROTOCOL) || + protocol.equalsIgnoreCase(COMPACT_PROTOCOL)), + "binary or compact are the only valid Thrift protocol types to " + + "choose from."); } @Override @@ -167,8 +187,13 @@ public class ThriftSource extends AbstractSource implements Configurable, } try { - - args.protocolFactory(new TCompactProtocol.Factory()); + if (protocol.equals(BINARY_PROTOCOL)) { + logger.info("Using TBinaryProtocol"); + args.protocolFactory(new TBinaryProtocol.Factory()); + } else { + logger.info("Using TCompactProtocol"); + args.protocolFactory(new TCompactProtocol.Factory()); + } args.inputTransportFactory(new TFastFramedTransport.Factory()); args.outputTransportFactory(new TFastFramedTransport.Factory()); args.processor(new ThriftSourceProtocol http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java index 5f70d1b..fccaede 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java @@ -19,15 +19,18 @@ package org.apache.flume.sink; import com.google.common.base.Charsets; + import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink; import org.apache.flume.Transaction; +import org.apache.flume.api.ThriftRpcClient; import org.apache.flume.api.ThriftTestingSource; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.ThriftSource; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,7 +61,7 @@ public class TestThriftSink { context.put("port", String.valueOf(port)); context.put("batch-size", String.valueOf(2)); context.put("request-timeout", String.valueOf(2000L)); - + context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL); sink.setChannel(channel); Configurables.configure(sink, context); @@ -77,7 +80,7 @@ public class TestThriftSink { Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), - port); + port, ThriftRpcClient.COMPACT_PROTOCOL); channel.start(); sink.start(); @@ -108,7 +111,7 @@ public class TestThriftSink { public void testTimeout() throws Exception { AtomicLong delay = new AtomicLong(); src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ALTERNATE - .name(), port); + .name(), port, ThriftRpcClient.COMPACT_PROTOCOL); src.setDelay(delay); delay.set(2500); @@ -182,7 +185,7 @@ public class TestThriftSink { } src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), - port); + port, ThriftRpcClient.COMPACT_PROTOCOL); for (int i = 0; i < 5; i++) { Sink.Status status = sink.process(); http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index cf45ab9..6382a0e 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java @@ -24,6 +24,7 @@ import org.apache.flume.FlumeException; import org.apache.flume.thrift.Status; import org.apache.flume.thrift.ThriftFlumeEvent; import org.apache.flume.thrift.ThriftSourceProtocol; +import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TSocket; @@ -57,6 +58,13 @@ public class ThriftRpcClient extends AbstractRpcClient { private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRpcClient.class); + /** + * Config param for the thrift protocol to use. + */ + public static final String CONFIG_PROTOCOL = "protocol"; + public static final String BINARY_PROTOCOL = "binary"; + public static final String COMPACT_PROTOCOL = "compact"; + private int batchSize; private long requestTimeout; private final Lock stateLock; @@ -68,6 +76,7 @@ public class ThriftRpcClient extends AbstractRpcClient { private final AtomicLong threadCounter; private int connectionPoolSize; private final Random random = new Random(); + private String protocol; public ThriftRpcClient() { stateLock = new ReentrantLock(true); @@ -267,6 +276,18 @@ public class ThriftRpcClient extends AbstractRpcClient { HostInfo host = HostInfo.getHostInfoList(properties).get(0); hostname = host.getHostName(); port = host.getPortNumber(); + protocol = properties.getProperty(CONFIG_PROTOCOL); + if (protocol == null) { + // default is to use the compact protocol. + protocol = COMPACT_PROTOCOL; + } + // check in case that garbage was put in. + if (!(protocol.equalsIgnoreCase(BINARY_PROTOCOL) || + protocol.equalsIgnoreCase(COMPACT_PROTOCOL))) { + LOGGER.warn("'binary' or 'compact' are the only valid Thrift protocol types to " + + "choose from. Defaulting to 'compact'."); + protocol = COMPACT_PROTOCOL; + } batchSize = Integer.parseInt(properties.getProperty( RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString())); @@ -322,8 +343,15 @@ public class ThriftRpcClient extends AbstractRpcClient { public ClientWrapper() throws Exception{ transport = new TFastFramedTransport(new TSocket(hostname, port)); transport.open(); - client = new ThriftSourceProtocol.Client(new TCompactProtocol - (transport)); + if (protocol.equals(BINARY_PROTOCOL)) { + LOGGER.info("Using TBinaryProtocol"); + client = new ThriftSourceProtocol.Client(new TBinaryProtocol + (transport)); + } else { + LOGGER.info("Using TCompactProtocol"); + client = new ThriftSourceProtocol.Client(new TCompactProtocol + (transport)); + } // Not a great hash code, but since this class is immutable and there // is at most one instance of the components of this class, // this works fine [If the objects are equal, hash code is the same] http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java index 88eb5e7..a8baaa8 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java @@ -56,6 +56,7 @@ public class TestThriftRpcClient { props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10"); props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, "2000"); + props.setProperty(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL); } @After @@ -103,7 +104,7 @@ public class TestThriftRpcClient { @Test public void testOK() throws Exception { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), - port); + port, ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getInstance(props); insertEvents(client, 10); //10 events insertAsBatch(client, 10, 25); //16 events @@ -121,7 +122,7 @@ public class TestThriftRpcClient { @Test public void testSlow() throws Exception { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.SLOW.name(), - port); + port, ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getInstance(props); insertEvents(client, 2); //2 events insertAsBatch(client, 2, 25); //24 events (3 batches) @@ -139,7 +140,7 @@ public class TestThriftRpcClient { @Test(expected = EventDeliveryException.class) public void testFail() throws Exception { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.FAIL.name(), - port); + port, ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getInstance(props); insertEvents(client, 2); //2 events Assert.fail("Expected EventDeliveryException to be thrown."); @@ -149,7 +150,7 @@ public class TestThriftRpcClient { public void testError() throws Throwable { try { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR - .name(), port); + .name(), port, ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + ".0", port); insertEvents(client, 2); //2 events @@ -163,7 +164,7 @@ public class TestThriftRpcClient { public void testTimeout() throws Throwable { try { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT - .name(), port); + .name(), port, ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getThriftInstance(props); insertEvents(client, 2); //2 events } catch (EventDeliveryException ex) { @@ -174,7 +175,7 @@ public class TestThriftRpcClient { @Test public void testMultipleThreads() throws Throwable { src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), - port); + port, ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + ".0", port, 10); int threadCount = 100; http://git-wip-us.apache.org/repos/asf/flume/blob/34e9bda3/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java index cde7269..63d2fc3 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java @@ -25,7 +25,11 @@ import org.apache.flume.thrift.Status; import org.apache.flume.thrift.ThriftFlumeEvent; import org.apache.flume.thrift.ThriftSourceProtocol; import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TNonblockingServerSocket; @@ -180,7 +184,7 @@ public class ThriftTestingSource { } } - public ThriftTestingSource(String handlerName, int port) throws Exception { + public ThriftTestingSource(String handlerName, int port, String protocol) throws Exception { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port)); ThriftSourceProtocol.Iface handler = null; @@ -197,11 +201,16 @@ public class ThriftTestingSource { } else if (handlerName.equals(HandlerType.ALTERNATE.name())) { handler = new ThriftAlternateHandler(); } - + TProtocolFactory transportProtocolFactory = null; + if (protocol != null && protocol == ThriftRpcClient.BINARY_PROTOCOL) { + transportProtocolFactory = new TBinaryProtocol.Factory(); + } else { + transportProtocolFactory = new TCompactProtocol.Factory(); + } server = new THsHaServer(new THsHaServer.Args (serverTransport).processor( new ThriftSourceProtocol.Processor(handler)).protocolFactory( - new TCompactProtocol.Factory())); + transportProtocolFactory)); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() {
