[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072840#comment-17072840 ]
Luke Cwik commented on BEAM-8944: --------------------------------- So the original analysis done in the description of the bug isn't typical usage because it is comparing creating `M` threads which each process one item vs creating a fixed number of threads which then process that then process the items off a queue. It is easy to see that the performance difference will be in the number of threads created. It is expected that in typical usage threads would be reused many times before being garbage collected. Max, for your analysis in the above comment: Is Flink is now able to schedule so much more work since the Python SDK harness will do an infinite number due to the UnboundedThreadPool vs the fixed number it supported before (so increased contention)? Is it possible that ThreadPoolExecutor has a C++ implementation which is making up for the difference? Do you have timing information between the two run that compares the two implementations? Do you have a graph showing how many threads are alive in the Python process between the two runs? > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --------------------------------------------------------------------------- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness > Affects Versions: 2.18.0 > Reporter: Yichi Zhang > Priority: Critical > Fix For: 2.18.0 > > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 1000000 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at > 0x7fab400dbe50> uses 268.160675049 > <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses > 79.904583931 > <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)