-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/
-----------------------------------------------------------

Review request for samza.


Repository: samza


Description
-------

In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
the results are subpar to map/reduce and spark. By looking at the metrics 
closely, we found two basic problems:

1) Not enough data to process. This is spotted as the unprocessed message queue 
length was zero for quite a lot of times.

2) Not process fast enough. We found samza performed closely in both median 
size records (100B) and small record (10B), while spark can scale very well in 
the small record (over 1M/s).

The first problem is solved by increasing the buffer size. This ticket is to 
address the second problem, which contains three major improvements:

- Option to turn off timer metrics calculation: one of the main time spent in 
samza processing turns out to be just keeping the timer metrics. While it is 
useful in debugging, it becomes a bottleneck when running a stable job with 
high performance. In my testing job which consumes 8M mock data, it took 30 
secs with timer metrics on. After turning it off, it only took 14 secs.

- Java coding improvements: The AsyncRunLoop code can be further optimized for 
efficiency. Some of the thread-safe data structure I am using is not for 
optimal performance (Collections.synchronizedSet). I switched to use 
CopyOnWriteArraySet, which has far better performance due to more reads and 
small set size.

- Specific handling for in-order processing improvements: AsyncRunLoop handles 
the callbacks regardless of whether it's in-order or out-of-order (max 
concurrency > 1), which incurs quite some cost. By simplying the logic for 
in-order handling, the performance gains.

After all three improvements, my test job with mock input (8M messages) can be 
processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu core.


Diffs
-----

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
052b3b91ec609ca6288662cfa2d3e71b0273d020 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
9b700998d2af040c6734289f7f28bbd78c36bd2c 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
132cf59eb593524a4cac134aeceeeb37a4c74b1f 
  samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java 
472e0a59d5aa992b136292c8a3347c311e2cd606 
  samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
e0468ee89c89fd720834461771ebb36475475bcb 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
e2aed5b1c2e77a914268963b21809380972037b6 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
c4836f202f7eda1d4e71eac94fd48e46207b0316 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
6000ffaf2b8723d48a72e58b571f242a42dc8128 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
d7110f34a9eae6e9ffc15b4982bfbb180da88b2d 
  
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 c975893a42689732c39c39600fecacee843bf9d6 

Diff: https://reviews.apache.org/r/53282/diff/


Testing
-------

./gradlew build

Tested in the yarn hadoop cluster with different kinds of jobs.


Thanks,

Xinyu Liu

Reply via email to