Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-09 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review155478
---


Ship it!




Looks pretty good, thanks!


samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java (line 
134)


Maybe make private if not being tested.



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 23)


Unused import?



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 23)






samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 23)






samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
155)






samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 393)






samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 393)


s/TimerClock/HighResolutionClock


- Prateek Maheshwari


On Nov. 9, 2016, 11:10 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 9, 2016, 11:10 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
> 69ba441ed087305dfe4e1272b00fad67b644e13f 
>   
> samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
> 2e65b603bc959273e679eba3ea9f89c7c0c4664d 
>   samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
> d1298fc40680e5ad4db7067c9ef02f0266dffc1d 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-09 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/
---

(Updated Nov. 9, 2016, 7:10 p.m.)


Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.


Changes
---

Some final touch on the rb. Please take a look.


Repository: samza


Description
---

In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
the results are subpar to map/reduce and spark. By looking at the metrics 
closely, we found two basic problems:

1) Not enough data to process. This is spotted as the unprocessed message queue 
length was zero for quite a lot of times.

2) Not process fast enough. We found samza performed closely in both median 
size records (100B) and small record (10B), while spark can scale very well in 
the small record (over 1M/s).

The first problem is solved by increasing the buffer size. This ticket is to 
address the second problem, which contains three major improvements:

- Option to turn off timer metrics calculation: one of the main time spent in 
samza processing turns out to be just keeping the timer metrics. While it is 
useful in debugging, it becomes a bottleneck when running a stable job with 
high performance. In my testing job which consumes 8M mock data, it took 30 
secs with timer metrics on. After turning it off, it only took 14 secs.

- Java coding improvements: The AsyncRunLoop code can be further optimized for 
efficiency. Some of the thread-safe data structure I am using is not for 
optimal performance (Collections.synchronizedSet). I switched to use 
CopyOnWriteArraySet, which has far better performance due to more reads and 
small set size.

- Specific handling for in-order processing improvements: AsyncRunLoop handles 
the callbacks regardless of whether it's in-order or out-of-order (max 
concurrency > 1), which incurs quite some cost. By simplying the logic for 
in-order handling, the performance gains.

After all three improvements, my test job with mock input (8M messages) can be 
processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu core.

For the performance benchmark jobs running in Hadoop, we also see a 4 times 
improvement with all the fixes above. Please take a look at the attached 
spreedsheet (see the numbers with fix(turn off the timing metrics) and fix2(all 
three together).


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
052b3b91ec609ca6288662cfa2d3e71b0273d020 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
9b700998d2af040c6734289f7f28bbd78c36bd2c 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
132cf59eb593524a4cac134aeceeeb37a4c74b1f 
  samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
69ba441ed087305dfe4e1272b00fad67b644e13f 
  samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
2e65b603bc959273e679eba3ea9f89c7c0c4664d 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
d1298fc40680e5ad4db7067c9ef02f0266dffc1d 
  samza-core/src/main/java/org/apache/samza/util/Utils.java 
472e0a59d5aa992b136292c8a3347c311e2cd606 
  samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
e0468ee89c89fd720834461771ebb36475475bcb 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
e2aed5b1c2e77a914268963b21809380972037b6 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
c4836f202f7eda1d4e71eac94fd48e46207b0316 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
6000ffaf2b8723d48a72e58b571f242a42dc8128 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
d7110f34a9eae6e9ffc15b4982bfbb180da88b2d 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
0276e6b17bc9ad9413611189b2e9ff2b8793694c 
  
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 c975893a42689732c39c39600fecacee843bf9d6 

Diff: https://reviews.apache.org/r/53282/diff/


Testing
---

./gradlew build

Tested in the yarn hadoop cluster with different kinds of jobs.


File Attachments


hdfs performance
  

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Xinyu Liu


> On Nov. 2, 2016, 7:01 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/util/TimerClock.java, line 25
> > 
> >
> > We have a HighResolutionClock that does this. I think you can use it 
> > here.
> 
> Xinyu Liu wrote:
> I was thinking about using it at first. But the HighResolutionClock has 
> two functions, nanoTime() and sleep(). The second method does not fit here 
> for my usage. The other benefit of using a single method interface here is 
> that I can do lambda function, which makes code nicer to read :).
> 
> Or I can remove the nanoTime() interface from HighResolutionClock and let 
> HighResolutionClock extends from TimerClock. Does this sound better to you?
> 
> Chris Pettitt wrote:
> I would probably pull sleep off of HighResolutionClock. It looks like it 
> may only be used in one place now which means it could just be inlined. 
> Otherwise it could be added to a util class.

Good suggestion. I inline the sleep inside the ThrottlingExcutor and now we 
only have a single HighResolutionClock class.


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154611
---


On Nov. 9, 2016, 2:14 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 9, 2016, 2:14 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
> 69ba441ed087305dfe4e1272b00fad67b644e13f 
>   
> samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
> 2e65b603bc959273e679eba3ea9f89c7c0c4664d 
>   samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
> d1298fc40680e5ad4db7067c9ef02f0266dffc1d 
>   

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Xinyu Liu


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 65
> > 
> >
> > I don't think we should ever be creating a default new SerdeManager 
> > just for this class. Same for SystemConsumerMetrics (and other places where 
> > this pattern is used for objects). Constants are fine.
> 
> Xinyu Liu wrote:
> I think these are created as default for the testing purpose. Normally we 
> will pass in the real SerdeManager.
> 
> Prateek Maheshwari wrote:
> I'd still argue that we shouldn't do this.
> 1. It makes it possible to accidentally forget passing objects which are 
> actually necessary for this class to be functional.
> 2. Looking at the signature of these constructors, there's no indication 
> whether the field is really required or optional.
> 2. We rely on this pattern in other places for automatically creating 
> objects. It makes looking up what classes are being used where more difficult.
> 
> The only benefit is saving a few characters when instantiating in tests. 
> Would strongly prefer always passing objects explicitly.

Talked offline. We are not going to refactor this legacy stuff for now.


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154602
---


On Nov. 9, 2016, 2:14 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 9, 2016, 2:14 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
> 69ba441ed087305dfe4e1272b00fad67b644e13f 
>   
> samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
> 2e65b603bc959273e679eba3ea9f89c7c0c4664d 
>   

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/
---

(Updated Nov. 9, 2016, 2:14 a.m.)


Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.


Changes
---

Updates from Chris's feedback.


Repository: samza


Description
---

In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
the results are subpar to map/reduce and spark. By looking at the metrics 
closely, we found two basic problems:

1) Not enough data to process. This is spotted as the unprocessed message queue 
length was zero for quite a lot of times.

2) Not process fast enough. We found samza performed closely in both median 
size records (100B) and small record (10B), while spark can scale very well in 
the small record (over 1M/s).

The first problem is solved by increasing the buffer size. This ticket is to 
address the second problem, which contains three major improvements:

- Option to turn off timer metrics calculation: one of the main time spent in 
samza processing turns out to be just keeping the timer metrics. While it is 
useful in debugging, it becomes a bottleneck when running a stable job with 
high performance. In my testing job which consumes 8M mock data, it took 30 
secs with timer metrics on. After turning it off, it only took 14 secs.

- Java coding improvements: The AsyncRunLoop code can be further optimized for 
efficiency. Some of the thread-safe data structure I am using is not for 
optimal performance (Collections.synchronizedSet). I switched to use 
CopyOnWriteArraySet, which has far better performance due to more reads and 
small set size.

- Specific handling for in-order processing improvements: AsyncRunLoop handles 
the callbacks regardless of whether it's in-order or out-of-order (max 
concurrency > 1), which incurs quite some cost. By simplying the logic for 
in-order handling, the performance gains.

After all three improvements, my test job with mock input (8M messages) can be 
processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu core.

For the performance benchmark jobs running in Hadoop, we also see a 4 times 
improvement with all the fixes above. Please take a look at the attached 
spreedsheet (see the numbers with fix(turn off the timing metrics) and fix2(all 
three together).


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
052b3b91ec609ca6288662cfa2d3e71b0273d020 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
9b700998d2af040c6734289f7f28bbd78c36bd2c 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
132cf59eb593524a4cac134aeceeeb37a4c74b1f 
  samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
69ba441ed087305dfe4e1272b00fad67b644e13f 
  samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
2e65b603bc959273e679eba3ea9f89c7c0c4664d 
  samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
d1298fc40680e5ad4db7067c9ef02f0266dffc1d 
  samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java 
472e0a59d5aa992b136292c8a3347c311e2cd606 
  samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
e0468ee89c89fd720834461771ebb36475475bcb 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
e2aed5b1c2e77a914268963b21809380972037b6 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
c4836f202f7eda1d4e71eac94fd48e46207b0316 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
6000ffaf2b8723d48a72e58b571f242a42dc8128 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
d7110f34a9eae6e9ffc15b4982bfbb180da88b2d 
  samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java 
0276e6b17bc9ad9413611189b2e9ff2b8793694c 
  
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 c975893a42689732c39c39600fecacee843bf9d6 

Diff: https://reviews.apache.org/r/53282/diff/


Testing
---

./gradlew build

Tested in the yarn hadoop cluster with different kinds of jobs.


File Attachments


hdfs performance
  

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review155376
---




samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 
155)


Maybe 'timer' or 'highResolutionClock' instead, since clock can be confused 
for currentTimeMillis? As Jagadish pointed out in an unrelated context, they're 
not equivalent since one returns epoch time and the other is only useful for 
time differences.



samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 (line 138)


Only tangentially related to RB, but this feels wrong in terms of config 
(and code) dependencies: StorageEngine is reading the Metrics configuration out 
of the entire container context, so that it can choose what clock the Timer 
should use.

Haven't thought this through, but maybe we should refactor things to move 
the clock to the Timer class instead?


- Prateek Maheshwari


On Nov. 7, 2016, 4:47 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 7, 2016, 4:47 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 472e0a59d5aa992b136292c8a3347c311e2cd606 
>   samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
> c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> e2aed5b1c2e77a914268963b21809380972037b6 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> 

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-08 Thread Prateek Maheshwari


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> >

Sorry for the late reply, didn't get an email notification for your replies.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 65
> > 
> >
> > I don't think we should ever be creating a default new SerdeManager 
> > just for this class. Same for SystemConsumerMetrics (and other places where 
> > this pattern is used for objects). Constants are fine.
> 
> Xinyu Liu wrote:
> I think these are created as default for the testing purpose. Normally we 
> will pass in the real SerdeManager.

I'd still argue that we shouldn't do this.
1. It makes it possible to accidentally forget passing objects which are 
actually necessary for this class to be functional.
2. Looking at the signature of these constructors, there's no indication 
whether the field is really required or optional.
2. We rely on this pattern in other places for automatically creating objects. 
It makes looking up what classes are being used where more difficult.

The only benefit is saving a few characters when instantiating in tests. Would 
strongly prefer always passing objects explicitly.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 397
> > 
> >
> > This seems like a weird method to have. Would prefer to remove.
> 
> Xinyu Liu wrote:
> This makes a lot easier to convert a Java TimerClock object to a scala 
> function to return the time. I agree it's not pretty, but there has to be a 
> workaround for converting this right now.

Can we pass TimerClock to Scala classes too? Should be cleaner.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/util/TimerClock.java, line 25
> > 
> >
> > Can we just add this method to the existing Clock interface? Weird to 
> > have two clock interfaces.
> 
> Xinyu Liu wrote:
> Chris raised the same question. The other interface has an extra method I 
> don't need, plus I want to use lamdba for this. I think I can make the 
> HighResolutionClock extends from this one. Does that sounds better?

As you mentioned earlier, moving the other method in HighResolutionClock to the 
calling class and use it instead makes sense.


> On Nov. 2, 2016, 12:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line 
> > 33
> > 
> >
> > Minor: s/timer/timers (same for field name)
> 
> Xinyu Liu wrote:
> I put this config to mean all timer metrics are turned on/off. So I guess 
> this config name should be fine.

Plural seems more appropriate grammatically since this applies to all timers.


- Prateek


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154602
---


On Nov. 7, 2016, 4:47 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 7, 2016, 4:47 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I 

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-07 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/
---

(Updated Nov. 8, 2016, 12:47 a.m.)


Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.


Changes
---

Updates based on the previous feedback. Thanks a lot for the comments!


Repository: samza


Description
---

In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
the results are subpar to map/reduce and spark. By looking at the metrics 
closely, we found two basic problems:

1) Not enough data to process. This is spotted as the unprocessed message queue 
length was zero for quite a lot of times.

2) Not process fast enough. We found samza performed closely in both median 
size records (100B) and small record (10B), while spark can scale very well in 
the small record (over 1M/s).

The first problem is solved by increasing the buffer size. This ticket is to 
address the second problem, which contains three major improvements:

- Option to turn off timer metrics calculation: one of the main time spent in 
samza processing turns out to be just keeping the timer metrics. While it is 
useful in debugging, it becomes a bottleneck when running a stable job with 
high performance. In my testing job which consumes 8M mock data, it took 30 
secs with timer metrics on. After turning it off, it only took 14 secs.

- Java coding improvements: The AsyncRunLoop code can be further optimized for 
efficiency. Some of the thread-safe data structure I am using is not for 
optimal performance (Collections.synchronizedSet). I switched to use 
CopyOnWriteArraySet, which has far better performance due to more reads and 
small set size.

- Specific handling for in-order processing improvements: AsyncRunLoop handles 
the callbacks regardless of whether it's in-order or out-of-order (max 
concurrency > 1), which incurs quite some cost. By simplying the logic for 
in-order handling, the performance gains.

After all three improvements, my test job with mock input (8M messages) can be 
processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu core.

For the performance benchmark jobs running in Hadoop, we also see a 4 times 
improvement with all the fixes above. Please take a look at the attached 
spreedsheet (see the numbers with fix(turn off the timing metrics) and fix2(all 
three together).


Diffs (updated)
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
052b3b91ec609ca6288662cfa2d3e71b0273d020 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
9b700998d2af040c6734289f7f28bbd78c36bd2c 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
132cf59eb593524a4cac134aeceeeb37a4c74b1f 
  samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java 
472e0a59d5aa992b136292c8a3347c311e2cd606 
  samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
e0468ee89c89fd720834461771ebb36475475bcb 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
e2aed5b1c2e77a914268963b21809380972037b6 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
c4836f202f7eda1d4e71eac94fd48e46207b0316 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
6000ffaf2b8723d48a72e58b571f242a42dc8128 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
d7110f34a9eae6e9ffc15b4982bfbb180da88b2d 
  
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 c975893a42689732c39c39600fecacee843bf9d6 

Diff: https://reviews.apache.org/r/53282/diff/


Testing
---

./gradlew build

Tested in the yarn hadoop cluster with different kinds of jobs.


File Attachments


hdfs performance
  
https://reviews.apache.org/media/uploaded/files/2016/11/02/c05007fe-2fdd-4c8c-b5ef-b7862dea13b2__hdfs_perf.png


Thanks,

Xinyu Liu



Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-07 Thread Xinyu Liu


> On Nov. 7, 2016, 5:27 p.m., Boris Shkolnik wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 550
> > 
> >
> > So we don't need a callback on failure anymore?

yes. This is a cleanup of previous code. TaskCallbackManager only handles the 
completed the callbacks, not the failure ones.


> On Nov. 7, 2016, 5:27 p.m., Boris Shkolnik wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 589
> > 
> >
> > the queue size is 1 at maximum?

The queue size could be more. For end of stream check, it requires the queue 
only has one message, and the message should be the end-of-stream message.


> On Nov. 7, 2016, 5:27 p.m., Boris Shkolnik wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 709
> > 
> >
> > Nice!

This is actaully helping perfromance a lot when I look for slowness in samza. 
String manupulation becomes performance problems when everything is under 1 
microsecond.


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154283
---


On Nov. 2, 2016, 5:56 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 2, 2016, 5:56 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 472e0a59d5aa992b136292c8a3347c311e2cd606 
>   samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
> c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
>   

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-07 Thread Xinyu Liu


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 589
> > 
> >
> > Typo: Envelope

fixed. Thanks for catching this!


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java, 
> > line 81
> > 
> >
> > This would probably be good to have as info.

fixed.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java, 
> > line 91
> > 
> >
> > Minor: s/completeCallbacks/completedCallbacks?

fixed


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java, 
> > line 91
> > 
> >
> > Minor: s/completeCallbacks/completedCallbacks?

same above.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java, 
> > line 99
> > 
> >
> > Minor: Is this within the max line width?

fixed.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/util/TimerClock.java, line 25
> > 
> >
> > Can we just add this method to the existing Clock interface? Weird to 
> > have two clock interfaces.

Chris raised the same question. The other interface has an extra method I don't 
need, plus I want to use lamdba for this. I think I can make the 
HighResolutionClock extends from this one. Does that sounds better?


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/java/org/apache/samza/util/Utils.java, line 38
> > 
> >
> > Prefer not having a util class for this one method, which I think is 
> > only used in one place? Also don't think this is worth having a util method 
> > for.

Good point. The method was only used by one class (RunLoopFactory). I moved the 
method directly in that class to avoid the util class.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line 
> > 33
> > 
> >
> > Minor: s/timer/timers (same for field name)

I put this config to mean all timer metrics are turned on/off. So I guess this 
config name should be fine.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line 
> > 64
> > 
> >
> > Minor: s/using/use

fixed


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line 
> > 69
> > 
> >
> > Don't need `return` here (and later).

fixed


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 65
> > 
> >
> > I don't think we should ever be creating a default new SerdeManager 
> > just for this class. Same for SystemConsumerMetrics (and other places where 
> > this pattern is used for objects). Constants are fine.

I think these are created as default for the testing purpose. Normally we will 
pass in the real SerdeManager.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 397
> > 
> >
> > This seems like a weird method to have. Would prefer to remove.

This makes a lot easier to convert a Java TimerClock object to a scala function 
to return the time. I agree it's not pretty, but there has to be a workaround 
for converting this right now.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, 
> > line 108
> > 
> >
> > Same here, not sure if we should use the default.

Same as above.


> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote:
> > samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala,
> >  line 137
> > 

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-07 Thread Xinyu Liu


> On Nov. 2, 2016, 7:01 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/util/TimerClock.java, line 25
> > 
> >
> > We have a HighResolutionClock that does this. I think you can use it 
> > here.

I was thinking about using it at first. But the HighResolutionClock has two 
functions, nanoTime() and sleep(). The second method does not fit here for my 
usage. The other benefit of using a single method interface here is that I can 
do lambda function, which makes code nicer to read :).

Or I can remove the nanoTime() interface from HighResolutionClock and let 
HighResolutionClock extends from TimerClock. Does this sound better to you?


- Xinyu


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154611
---


On Nov. 2, 2016, 5:56 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 2, 2016, 5:56 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 472e0a59d5aa992b136292c8a3347c311e2cd606 
>   samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
> c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> e2aed5b1c2e77a914268963b21809380972037b6 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c4836f202f7eda1d4e71eac94fd48e46207b0316 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> 6000ffaf2b8723d48a72e58b571f242a42dc8128 
>   

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-02 Thread Prateek Maheshwari

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154602
---




samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 588)


Typo: Envelope



samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java (line 
79)


This would probably be good to have as info.



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)


Minor: s/completeCallbacks/completedCallbacks?



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
88)


Minor: s/completeCallbacks/completedCallbacks?



samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java (line 
95)


Minor: Is this within the max line width?



samza-core/src/main/java/org/apache/samza/util/TimerClock.java (line 25)


Can we just add this method to the existing Clock interface? Weird to have 
two clock interfaces.



samza-core/src/main/java/org/apache/samza/util/Utils.java (line 38)


Prefer not having a util class for this one method, which I think is only 
used in one place? Also don't think this is worth having a util method for.



samza-core/src/main/java/org/apache/samza/util/Utils.java (line 38)






samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 32)


Minor: s/timer/timers (same for field name)



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 63)


Minor: s/using/use



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 66)


Config objects shouldn't return actual instances. It should just return the 
config value and call site should create the actual/no-op clock based on it and 
pass it around to whoever needs it.



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 68)


Don't need `return` here (and later).



samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala (line 72)


Minor: Move to previous line.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
65)


I don't think we should ever be creating a default new SerdeManager just 
for this class. Same for SystemConsumerMetrics (and other places where this 
pattern is used for objects). Constants are fine.



samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala (line 
108)


Same here, not sure if we should use the default.



samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 397)


This seems like a weird method to have. Would prefer to remove.



samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 (line 137)


See comment about passing this in instead.


- Prateek Maheshwari


On Nov. 2, 2016, 10:56 a.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 2, 2016, 10:56 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing 

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-02 Thread Chris Pettitt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/#review154611
---


Fix it, then Ship it!




Can you get the numbers with all fixes but with instrumentation turned on? I 
don't think we'd ever run without the instrumentation on at LI. If 
instrumentation is a significant problem we should look at alternatives to 
tracking it or consider changing the granularity.


samza-core/src/main/java/org/apache/samza/util/TimerClock.java (line 25)


We have a HighResolutionClock that does this. I think you can use it here.


- Chris Pettitt


On Nov. 2, 2016, 5:56 p.m., Xinyu Liu wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/53282/
> ---
> 
> (Updated Nov. 2, 2016, 5:56 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.
> 
> 
> Repository: samza
> 
> 
> Description
> ---
> 
> In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
> the results are subpar to map/reduce and spark. By looking at the metrics 
> closely, we found two basic problems:
> 
> 1) Not enough data to process. This is spotted as the unprocessed message 
> queue length was zero for quite a lot of times.
> 
> 2) Not process fast enough. We found samza performed closely in both median 
> size records (100B) and small record (10B), while spark can scale very well 
> in the small record (over 1M/s).
> 
> The first problem is solved by increasing the buffer size. This ticket is to 
> address the second problem, which contains three major improvements:
> 
> - Option to turn off timer metrics calculation: one of the main time spent in 
> samza processing turns out to be just keeping the timer metrics. While it is 
> useful in debugging, it becomes a bottleneck when running a stable job with 
> high performance. In my testing job which consumes 8M mock data, it took 30 
> secs with timer metrics on. After turning it off, it only took 14 secs.
> 
> - Java coding improvements: The AsyncRunLoop code can be further optimized 
> for efficiency. Some of the thread-safe data structure I am using is not for 
> optimal performance (Collections.synchronizedSet). I switched to use 
> CopyOnWriteArraySet, which has far better performance due to more reads and 
> small set size.
> 
> - Specific handling for in-order processing improvements: AsyncRunLoop 
> handles the callbacks regardless of whether it's in-order or out-of-order 
> (max concurrency > 1), which incurs quite some cost. By simplying the logic 
> for in-order handling, the performance gains.
> 
> After all three improvements, my test job with mock input (8M messages) can 
> be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu 
> core.
> 
> For the performance benchmark jobs running in Hadoop, we also see a 4 times 
> improvement with all the fixes above. Please take a look at the attached 
> spreedsheet (see the numbers with fix(turn off the timing metrics) and 
> fix2(all three together).
> 
> 
> Diffs
> -
> 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> 052b3b91ec609ca6288662cfa2d3e71b0273d020 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> 9b700998d2af040c6734289f7f28bbd78c36bd2c 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> 132cf59eb593524a4cac134aeceeeb37a4c74b1f 
>   samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java 
> 472e0a59d5aa992b136292c8a3347c311e2cd606 
>   samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
> c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e0468ee89c89fd720834461771ebb36475475bcb 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> e2aed5b1c2e77a914268963b21809380972037b6 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c4836f202f7eda1d4e71eac94fd48e46207b0316 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> 6000ffaf2b8723d48a72e58b571f242a42dc8128 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> 99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 
>   

Re: Review Request 53282: SAMZA-1043: Samza performance improvements

2016-11-02 Thread Xinyu Liu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/53282/
---

(Updated Nov. 2, 2016, 5:56 p.m.)


Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh.


Repository: samza


Description (updated)
---

In the recent experiments of samza batch job (consuming hdfs data on hadoop), 
the results are subpar to map/reduce and spark. By looking at the metrics 
closely, we found two basic problems:

1) Not enough data to process. This is spotted as the unprocessed message queue 
length was zero for quite a lot of times.

2) Not process fast enough. We found samza performed closely in both median 
size records (100B) and small record (10B), while spark can scale very well in 
the small record (over 1M/s).

The first problem is solved by increasing the buffer size. This ticket is to 
address the second problem, which contains three major improvements:

- Option to turn off timer metrics calculation: one of the main time spent in 
samza processing turns out to be just keeping the timer metrics. While it is 
useful in debugging, it becomes a bottleneck when running a stable job with 
high performance. In my testing job which consumes 8M mock data, it took 30 
secs with timer metrics on. After turning it off, it only took 14 secs.

- Java coding improvements: The AsyncRunLoop code can be further optimized for 
efficiency. Some of the thread-safe data structure I am using is not for 
optimal performance (Collections.synchronizedSet). I switched to use 
CopyOnWriteArraySet, which has far better performance due to more reads and 
small set size.

- Specific handling for in-order processing improvements: AsyncRunLoop handles 
the callbacks regardless of whether it's in-order or out-of-order (max 
concurrency > 1), which incurs quite some cost. By simplying the logic for 
in-order handling, the performance gains.

After all three improvements, my test job with mock input (8M messages) can be 
processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu core.

For the performance benchmark jobs running in Hadoop, we also see a 4 times 
improvement with all the fixes above. Please take a look at the attached 
spreedsheet (see the numbers with fix(turn off the timing metrics) and fix2(all 
three together).


Diffs
-

  samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 
  samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 
  samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
052b3b91ec609ca6288662cfa2d3e71b0273d020 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
9b700998d2af040c6734289f7f28bbd78c36bd2c 
  samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
132cf59eb593524a4cac134aeceeeb37a4c74b1f 
  samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/util/Utils.java 
472e0a59d5aa992b136292c8a3347c311e2cd606 
  samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
e0468ee89c89fd720834461771ebb36475475bcb 
  samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
e2aed5b1c2e77a914268963b21809380972037b6 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 
c4836f202f7eda1d4e71eac94fd48e46207b0316 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
6000ffaf2b8723d48a72e58b571f242a42dc8128 
  samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 
  samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
d7110f34a9eae6e9ffc15b4982bfbb180da88b2d 
  
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 c975893a42689732c39c39600fecacee843bf9d6 

Diff: https://reviews.apache.org/r/53282/diff/


Testing
---

./gradlew build

Tested in the yarn hadoop cluster with different kinds of jobs.


File Attachments (updated)


hdfs performance
  
https://reviews.apache.org/media/uploaded/files/2016/11/02/c05007fe-2fdd-4c8c-b5ef-b7862dea13b2__hdfs_perf.png


Thanks,

Xinyu Liu