[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097665#comment-17097665 ] Luke Cwik commented on BEAM-8944: - I was able to come up with a compromise that I believe should work well on how many idle threads sit around, see https://github.com/apache/beam/pull/11590. [~mxm], if you can try out the patch locally with a larger pipeline to see if it addresses the issue and by how much. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 40m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17097302#comment-17097302 ] Maximilian Michels commented on BEAM-8944: -- [~lcwik] For the time being, do you think we could support two modes? 1) dynamic thread allocation 2) a static number of threads The second mode could be removed once we can ensure that it performs as efficient as the first one. We can default to mode (1). What do you think? > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074641#comment-17074641 ] Maximilian Michels commented on BEAM-8944: -- Thank you for your thoughts Luke. I agree that the best approach is to work on a solution based on profiling the current code. Here is my attempt to write a benchmark which most closely resembles our setup. There are at most two active bundles per SDK harness (remember, we use 16 of them and distribute bundles round-robin), hence two active tasks. This is adjustable in the {{run_benchmark}} method: {code:python} import time from concurrent import futures from future.moves import queue from utils.thread_pool_executor import UnboundedThreadPoolExecutor import cProfile as profiler def run_benchmark(executor, max_concurrent_threads=2, total_iterations=1, warmup_iterations=100): q = queue.Queue() for i in range(max_concurrent_threads): q.put(i) def task(number): q.put(task) return number count = 0 start_time = None profile = profiler.Profile() while count < total_iterations: if count == warmup_iterations: start_time = time.time() profile.enable() executor.submit(task, q.get(block=True)) count += 1 profile.disable() print('%s uses %s' % (executor, time.time() - start_time)) profile.print_stats(sort='time') if __name__ == '__main__': with UnboundedThreadPoolExecutor() as executor: run_benchmark(executor) with futures.ThreadPoolExecutor(max_workers=1) as executor: run_benchmark(executor) with futures.ThreadPoolExecutor(max_workers=12) as executor: run_benchmark(executor) {code} The results clearly reveal the cost of the current implementation: {noformat} uses 7.32124900818 575749 function calls in 7.302 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 838786.7010.0006.7010.000 {method 'acquire' of 'thread.lock' objects} 198000.0870.0006.9580.000 Queue.py:150(get) 85950.0640.0006.6620.001 threading.py:309(wait) 296980.0620.0000.1570.000 threading.py:373(notify) 99030.0450.0000.0860.000 threading.py:260(__init__) 99000.0430.0000.4810.000 thread_pool_executor.py:134(submit) 98990.0400.0000.1760.000 thread_pool_executor.py:103(accepted_work) 382930.0280.0000.0990.000 threading.py:300(_is_owned) 98990.0250.0000.1360.000 threading.py:576(set) 99000.0200.0000.0200.000 {method '__enter__' of 'thread.lock' objects} 99000.0200.0000.1170.000 _base.py:318(__init__) 99000.0140.0000.0240.000 threading.py:132(__init__) 382940.0130.0000.0130.000 threading.py:64(_note) 283940.0130.0000.0180.000 Queue.py:200(_qsize) 198060.0120.0000.0120.000 threading.py:59(__init__) 197990.0120.0000.0140.000 Queue.py:208(_get) 98990.0110.0000.0760.000 threading.py:400(notifyAll) 99030.0110.0000.0980.000 threading.py:242(Condition) 85950.0100.0000.0530.000 threading.py:297(_acquire_restore) 184990.0090.0000.0090.000 {thread.allocate_lock} 99000.0090.0000.0330.000 threading.py:114(RLock) 382940.0090.0000.0090.000 {method 'release' of 'thread.lock' objects} 99000.0070.0000.0070.000 thread_pool_executor.py:34(__init__) 99000.0070.0000.0260.000 threading.py:285(__enter__) 382930.0070.0000.0070.000 {len} 99000.0070.0000.0080.000 threading.py:288(__exit__) 98990.0040.0000.0040.000 {method 'remove' of 'list' objects} 85950.0040.0000.0060.000 threading.py:294(_release_save) 85950.0030.0000.0030.000 {method 'append' of 'list' objects} 197990.0030.0000.0030.000 {method 'popleft' of 'collections.deque' objects} 99000.0020.0000.0020.000 {method '__exit__' of 'thread.lock' objects} 10.0000.0000.0000.000 {thread.start_new_thread} 10.0000.0000.0000.000 threading.py:647(__init__) 10.0000.0000.0000.000 threading.py:717(start) 20.0000.0000.0000.000 threading.py:561(__init__) 10.0000.0000.0000.000 threading.py:620(_newname) 10.0000.0000.0000.000 threading.py:597(wait) 10.0000.0000.0000.000 _weakrefset.py:83(add) 10.0000.0000.0000.000 thread_pool_executor.py:58(__init__) 1
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073871#comment-17073871 ] Luke Cwik commented on BEAM-8944: - I was having difficulty in producing a solution that both created threads when needed and also was able to clean up threads without emptying the pool completely. Some ideas I had were: * Have a base pool of threads that never "exit" unless shutdown happens * Having a dedicated thread that checked queue size every N secs and generate new threads for what was outstanding (lag in creation, maybe non issue if the base pool size is like 8-16 threads) * Whenever a worker gets an item from the queue, it checks queue size and spawns a new worker thread if there are any items there (will get stuck if **work** never completes) * Only create threads on new submission with a certain amount of lag between. All of these are worse then the idle worker queue. Would be good to know what was expensive in that solution so that we could address that directly since the cost might just be coming from the "timed" wait aspect. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073657#comment-17073657 ] Maximilian Michels commented on BEAM-8944: -- My first though was to change the implementation to allow two worker thread allocation methods: (a) a static number of threads and (b) dynamically created worker threads. However, I don't like that the user has to worry about configuring such things. I very much prefer the self-tuning capabilities that we added with UnboundedThreadPoolExecutor. We just have to make sure there is no performance regression. Thanks [~lcwik]. The implementation looks much simpler. I wonder, could we keep the thread timeout feature? I think it should be possible to build this without the use of threading.Event and threading.Condition. For example, we could add a check after distributing work which checks for the activity of a worker thread and terminates it if it hasn't been active for a while. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17073142#comment-17073142 ] Luke Cwik commented on BEAM-8944: - I took a look at the implementation of the UnboundedThreadPoolExecutor again to see if I could reduce the overhead and got to https://github.com/lukecwik/incubator-beam/commit/4cabd7e11c35b67901ba19a55d7f8bc6a585be3b but it doesn't have the same strict guarantees that a thread will exit after getting no work for a certain amount of time and also a new thread will be created when necessary. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072860#comment-17072860 ] Maximilian Michels commented on BEAM-8944: -- I see that the usage pattern in the description is a bit different but since the regression is caused by the new thread pool logic, I posted my findings here. {quote} Is Flink is now able to schedule so much more work since the Python SDK harness will do an infinite number due to the UnboundedThreadPool vs the fixed number it supported before (so increased contention)? {quote} We are not primarily using the worker threads for parallelism. Instead, we are using 16 separate environments per Flink task manager. These 16 environments are served with work round-robin. In the particular pipeline there are 12 task slots per task manager and at most two fused stages per task slot. That makes for a max of 24 stages which are distributed across the 16 environments. So the environments will spawn more than one worker thread. I've tried increasing the life time of the threads but that didn't have an effect. {quote} Is it possible that ThreadPoolExecutor has a C++ implementation which is making up for the difference? {quote} Possibly, I suppose we need to profile the execution. {quote} Do you have timing information between the two run that compares the two implementations? Do you have a graph showing how many threads are alive in the Python process between the two runs? {quote} I don't have such data yet. I only have the repeatable data for checkpointing. Note, I was running this on both Python 2.7.6 and Python 3.6.8 which showed the same behavior. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Fix For: 2.18.0 > > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072842#comment-17072842 ] Luke Cwik commented on BEAM-8944: - The Python 3.5 implementation of the ThreadPoolExecutor: https://github.com/python/cpython/blob/3.5/Lib/concurrent/futures/thread.py > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Fix For: 2.18.0 > > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072840#comment-17072840 ] Luke Cwik commented on BEAM-8944: - So the original analysis done in the description of the bug isn't typical usage because it is comparing creating `M` threads which each process one item vs creating a fixed number of threads which then process that then process the items off a queue. It is easy to see that the performance difference will be in the number of threads created. It is expected that in typical usage threads would be reused many times before being garbage collected. Max, for your analysis in the above comment: Is Flink is now able to schedule so much more work since the Python SDK harness will do an infinite number due to the UnboundedThreadPool vs the fixed number it supported before (so increased contention)? Is it possible that ThreadPoolExecutor has a C++ implementation which is making up for the difference? Do you have timing information between the two run that compares the two implementations? Do you have a graph showing how many threads are alive in the Python process between the two runs? > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Fix For: 2.18.0 > > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072693#comment-17072693 ] Maximilian Michels commented on BEAM-8944: -- !checkpoint-duration.png! yellow: 2.18.0 as released turquoise: 2.18.0 with BEAM-8944 reverted > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Fix For: 2.18.0 > > Attachments: checkpoint-duration.png, profiling.png, > profiling_one_thread.png, profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072103#comment-17072103 ] Maximilian Michels commented on BEAM-8944: -- Essentially, yes. Especially, this is a concern for latency because Flink has to hold back in-flight elements while performing the checkpoint alignment of the operators. It appears the alignment is off due to the Python bundles not completing in time. Not sure why that is the case. We use the load-balancing feature of DefaultJobBundleFactory where we use 16 Python environment and round-robin them. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071998#comment-17071998 ] Yichi Zhang commented on BEAM-8944: --- [~mxm] the checkpoint duration is lower means that without UnboundedThreadPoolExecutor FlinkRunner is faster? > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Critical > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001802#comment-17001802 ] Yichi Zhang commented on BEAM-8944: --- I think so > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 3h > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001589#comment-17001589 ] Ahmet Altay commented on BEAM-8944: --- Could this be closed after the cherry pick PR ([https://github.com/apache/beam/pull/10430]) ? > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 3h > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000448#comment-17000448 ] Yichi Zhang commented on BEAM-8944: --- then yeah, it'll affect python streaming jobs (which is only on portable runner with fnapi). > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000445#comment-17000445 ] Udi Meiri commented on BEAM-8944: - The point of my question is to figure whether this issue is a 2.18 blocker or not, and how it affects users. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000443#comment-17000443 ] Udi Meiri commented on BEAM-8944: - By current I don't mean the current Beam release but the current production-ready runners (for example IIUC portability on Dataflow is not production ready). > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17000313#comment-17000313 ] Udi Meiri commented on BEAM-8944: - Does this bug affect current production runners (such as Dataflow runner mention in the TODO in #10387)? > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.18.0 > > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16994104#comment-16994104 ] Yichi Zhang commented on BEAM-8944: --- Seems like the original change was to solve deadlock and stuckness issue. While the usage of UnboundedThreadPoolExecutor does seem to impact pipelines that saturate cpu usage (~90%) quite a bit, it has less effect on under loaded pipelines. > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.18.0 >Reporter: Yichi Zhang >Priority: Major > Attachments: profiling.png, profiling_one_thread.png, > profiling_twelve_threads.png > > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > {code:python} > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put(i) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > {code} > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` > Profiling: > UnboundedThreadPoolExecutor: > !profiling.png! > 1 Thread ThreadPoolExecutor: > !profiling_one_thread.png! > 12 Threads ThreadPoolExecutor: > !profiling_twelve_threads.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993141#comment-16993141 ] Yichi Zhang commented on BEAM-8944: --- CC: [~angoenka] > Python SDK harness performance degradation with UnboundedThreadPoolExecutor > --- > > Key: BEAM-8944 > URL: https://issues.apache.org/jira/browse/BEAM-8944 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness >Affects Versions: 2.17.0, 2.18.0 >Reporter: Yichi Zhang >Priority: Major > > We are seeing a performance degradation for python streaming word count load > tests. > > After some investigation, it appears to be caused by swapping the original > ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is > that python performance is worse with more threads on cpu-bounded tasks. > > A simple test for comparing the multiple thread pool executor performance: > > def test_performance(self): > def run_perf(executor): > total_number = 100 > q = queue.Queue() > def task(number): > hash(number) > q.put(number + 200) > return number > t = time.time() > count = 0 > for i in range(200): > q.put\(i\) > while count < total_number: > executor.submit(task, q.get(block=True)) > count += 1 > print('%s uses %s' % (executor, time.time() - t)) > with UnboundedThreadPoolExecutor() as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=1) as executor: > run_perf(executor) > with futures.ThreadPoolExecutor(max_workers=12) as executor: > run_perf(executor) > Results: > 0x7fab400dbe50> uses 268.160675049 > uses > 79.904583931 > uses > 191.179054976 > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)