[ 
https://issues.apache.org/jira/browse/KAFKA-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14292748#comment-14292748
 ] 

Navina Ramesh commented on KAFKA-1897:
--------------------------------------

1. 
 Yes. We can use the Cluster constructor. However, it is, as you mentioned, 
inconvenient. A client may not always want an empty cluster. The client should 
be able to create a topic with a fixed number of partitions, without worrying 
about the details of how "PartitionInfo" or "TopicPartition" is structured 
within Kafka. 

*Solution*:
Instead of creating multiple constructor interfaces for the MockProducer, you 
can expose helper methods for creating a cluster (similar to the ones in 
org/apache/samza/utils/TestUtils.java). This way the client can quickly create 
a cluster and customize it per the test requirements. A workaround for this 
issue has been addressed here - 
https://issues.apache.org/jira/browse/KAFKA-1861  

2. 
I am unable to conclude whether this is a gap in functionality or 
documentation. 
In Samza use-case, we use a generic producer interface that wraps the Kafka 
Producer. Due to this, we don't have access to the callback/future handles. 
Also, we had blocking calls that depend on producer thread to change the state 
and unblock

*Test Case*:
{code:borderStyle=solid}
MockProducer mock = new MockProducer(false)
KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...)
p.send("test", msg1)
p.send("test", msg2)
p.flush()       // --> Blocking on actual send!

mock.completeNext()             // --> Doesn't reach this point because this 
thread is blocked flush
{code}

Without the MockProducer running on a parallel thread it is difficult to make 
assertions. Assertions / intercepts have to be made outside the main test 
thread.

*Current Approach*:
{code:borderStyle=solid}
CustomMockProducer mock = new CustomMockProducer(false)
KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...)
p.send("test", msg1)
p.send("test", msg2)
mock.startSendThread(100)       // Starts a thread that invokes the buffered 
callbacks (mock.callback())
p.flush()                                       // --> Blocking call; 
Eventually unblocks when the mock producer thread completes send; If it doesn't 
unblock, then the test has failed. If the send throws exception, then we can 
intercept it here in the main thread itself.
assert(...)                                     // I can add tests to check the 
state of the kafka system producer 
{code}

*Alternate approach* - Start the blocking call in a separate thread and use the 
existing MockProducer
{code:borderStyle=solid}
MockProducer mock = new MockProducer(false)
KafkaSystemProducer p = new KafkaSystemProducer(producer=mock, ...)
p.send("test", msg1)
p.send("test", msg2)
Thread t = new Thread(new Runnable() { 
    p.flush()  //Blocks in a separate thread 
                        // <-- Asserts / intercepts go here ?? errorNext() does 
not bubble-up exceptions
 })
t.start()
mock.completeNext()
mock.completeNext()
{code}
Drawback: we have to assert or intercept the exception thrown from outside the 
main thread.

*Suggestions for the MockProducer*:
1. Provide an option to operate producer in _concurrent mode_ - This means a 
"send" will not invoke the callback immediately. With every send call, a future 
task gets created with callback and appended to the list of futures to be 
executed. "Completion" can be modified to make this work. This way the test 
thread and producer thread can be de-coupled.
2. Alternatively, the deque can be made accessible so that the client can 
choose to invoke the completion of send concurrently. 


> Enhance MockProducer for more sophisticated tests
> -------------------------------------------------
>
>                 Key: KAFKA-1897
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1897
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>            Reporter: Navina Ramesh
>            Assignee: Jun Rao
>             Fix For: 0.8.2
>
>
> Based on the experience of upgrading the kafka producer in Samza, we faced 
> two main constraints when using MockProducer: 
> 1. The constructor requires a cluster specification and the tools to create a 
> test cluster is not exposed. It is available from TestUtils in Kafka, however 
> that jar is not published. This issue is currently being addressed in 
> KAFKA-1861.
> 2. No support for testing a blocking client call. For example, "flush" in 
> Samza blocks on the future returned by the latest send request. In order to 
> test this, the MockProducer which buffers it should run in a concurrent mode. 
> There is currently no provision to do this. We want the MockProducer to 
> buffer the send and then, complete the callback concurrently while we wait 
> for "flush" to unblock. 
> We can write unit tests that have improved coverage if we can add support for 
> concurrent execution of the MockProducer and unit test thread. For example 
> implementation, please refer to the latest version of 
> KafkaSystemProducer.scala in the Apache Samza repository.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to