[ https://issues.apache.org/jira/browse/KAFKA-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14292748#comment-14292748 ]
Navina Ramesh commented on KAFKA-1897: -------------------------------------- 1. Yes. We can use the Cluster constructor. However, it is, as you mentioned, inconvenient. A client may not always want an empty cluster. The client should be able to create a topic with a fixed number of partitions, without worrying about the details of how "PartitionInfo" or "TopicPartition" is structured within Kafka. *Solution*: Instead of creating multiple constructor interfaces for the MockProducer, you can expose helper methods for creating a cluster (similar to the ones in org/apache/samza/utils/TestUtils.java). This way the client can quickly create a cluster and customize it per the test requirements. A workaround for this issue has been addressed here - https://issues.apache.org/jira/browse/KAFKA-1861 2. I am unable to conclude whether this is a gap in functionality or documentation. In Samza use-case, we use a generic producer interface that wraps the Kafka Producer. Due to this, we don't have access to the callback/future handles. Also, we had blocking calls that depend on producer thread to change the state and unblock *Test Case*: {code:borderStyle=solid} MockProducer mock = new MockProducer(false) KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...) p.send("test", msg1) p.send("test", msg2) p.flush() // --> Blocking on actual send! mock.completeNext() // --> Doesn't reach this point because this thread is blocked flush {code} Without the MockProducer running on a parallel thread it is difficult to make assertions. Assertions / intercepts have to be made outside the main test thread. *Current Approach*: {code:borderStyle=solid} CustomMockProducer mock = new CustomMockProducer(false) KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...) p.send("test", msg1) p.send("test", msg2) mock.startSendThread(100) // Starts a thread that invokes the buffered callbacks (mock.callback()) p.flush() // --> Blocking call; Eventually unblocks when the mock producer thread completes send; If it doesn't unblock, then the test has failed. If the send throws exception, then we can intercept it here in the main thread itself. assert(...) // I can add tests to check the state of the kafka system producer {code} *Alternate approach* - Start the blocking call in a separate thread and use the existing MockProducer {code:borderStyle=solid} MockProducer mock = new MockProducer(false) KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...) p.send("test", msg1) p.send("test", msg2) Thread t = new Thread(new Runnable() { p.flush() //Blocks in a separate thread // <-- Asserts / intercepts go here ?? errorNext() does not bubble-up exceptions }) t.start() mock.completeNext() mock.completeNext() {code} Drawback: we have to assert or intercept the exception thrown from outside the main thread. *Suggestions for the MockProducer*: 1. Provide an option to operate producer in _concurrent mode_ - This means a "send" will not invoke the callback immediately. With every send call, a future task gets created with callback and appended to the list of futures to be executed. "Completion" can be modified to make this work. This way the test thread and producer thread can be de-coupled. 2. Alternatively, the deque can be made accessible so that the client can choose to invoke the completion of send concurrently. > Enhance MockProducer for more sophisticated tests > ------------------------------------------------- > > Key: KAFKA-1897 > URL: https://issues.apache.org/jira/browse/KAFKA-1897 > Project: Kafka > Issue Type: Bug > Components: producer > Reporter: Navina Ramesh > Assignee: Jun Rao > Fix For: 0.8.2 > > > Based on the experience of upgrading the kafka producer in Samza, we faced > two main constraints when using MockProducer: > 1. The constructor requires a cluster specification and the tools to create a > test cluster is not exposed. It is available from TestUtils in Kafka, however > that jar is not published. This issue is currently being addressed in > KAFKA-1861. > 2. No support for testing a blocking client call. For example, "flush" in > Samza blocks on the future returned by the latest send request. In order to > test this, the MockProducer which buffers it should run in a concurrent mode. > There is currently no provision to do this. We want the MockProducer to > buffer the send and then, complete the callback concurrently while we wait > for "flush" to unblock. > We can write unit tests that have improved coverage if we can add support for > concurrent execution of the MockProducer and unit test thread. For example > implementation, please refer to the latest version of > KafkaSystemProducer.scala in the Apache Samza repository. -- This message was sent by Atlassian JIRA (v6.3.4#6332)