Introduction
This is the next article from my testing series. The previous one introduced monkeypatching as a way to abstract away some complexity. This article will also use monkeypatching, to solve a more complex problem. Most developers are familiar with testing synchronous code. Techniques such as mocking and stubbing, as well as the 'pure-fabrication' principle (more on that coming later) make up a pretty robust framework for covering regular code. However, sometimes we have to deal with highly concurrent programs which are also non-deterministic. In this article, I will analyze and cover one such scenario.
Problem statement
Consider the problem of finding the definite integral of the function f on the closed integral [a, b]. Then, denote the value as S.
We want to implement a function which evaluates the integral.
Math recap
The definite integral of a function can be visualized as the area under the function's curve. Therefore, to get a numerical estimation, we only need to calculate the area below the curve and above the X axis.
Solution
We can implement the function using a Monte Carlo simulation. This is a simplified overview of the algorithm
- Enclose the function's graph in the area to be integrated in a rectangle.
- Specify the number of tests.
- In each test, we 'throw' a point in the rectangle.
- If the point lands below the curve, we increment the
hitcounter. - Finally, we get the integral as
num_hit / num_tests * rect_area
From programming perspective, we want to implement such function
def mc_integrate(f, a, b, y_min, y_max):
"""
Calculates the definite integral of the function 'f' on
interval [a, b] using a MonteCarlo simulation
with y = y_min being the lower and y = y_max the upper
bound of the rectangle.
"""
...
Which can then be called like this
i = integrate(math.sin, 0, math.pi/2, -1, 1)
In this case, i would be a numerical approximation of the integral - a value close to 2.
Synchronous implementation
Let's implement the function in a naive way, step by step
import random
def mc_integrate(f, a, b, y_min, y_max):
num_tests = 100_000
rect_area = (b-a) * (y_max - y_min)
points = ((random.uniform(a, b), random.uniform(y_min, y_max)) for _ in range(num_tests))
num_under_curve = sum(y <= f(x) for x, y in points)
return num_under_curve / num_tests * rect_area
Now, we have to come up with a reasonable value for num_tests.
I have considered several possible values and benchmarked them.
Data in the below table comes from the average of 100 runs.
| num_tests | average performance (ms) | average error |
|---|---|---|
| \(10,000\) | \(20\) | \(2\%\) |
| \(100,000\) | \(60\) | \(1\%\) |
| \(1,000,000\) | \(500\) | \(0.1\%\) |
| \(10,000,000\) | \(5,000\) | \(0.01\%\) |
| \(50,000,000\) | \(30,000\) | \(0.001\%\) |
As you can see, the value of 50,000,000 tests yields accurate results, however, execution time is significant.
We can fix that.
Parallellism kicks in
Let's use all CPU cores that our machine has. We can get this number by calling
>> import multiprocessing
>> multiprocessing.cpu_count()
24
So, we have this code
import random
import multiprocessing
def mc_integrate(f, a, b, y_min, y_max):
num_tests = 50_000_000
num_cpus = multiprocessing.cpu_count()
tests_per_process = num_tests // num_cpus
rect_area = (b-a) * (y_max - y_min)
with multiprocessing.Pool(num_cpus) as pool:
num_under_curve_per_thread = pool.starmap(
perform_tests,
[(tests_per_process, f, a, b, y_min, y_max)] * num_cpus
)
total_num_under_curve = sum(num_under_curve_per_thread)
return total_num_under_curve / num_tests * rect_area
def perform_tests(num_tests, f, a, b, y_min, y_max):
points = ((random.uniform(a, b), random.uniform(y_min, y_max)) for _ in range(num_tests))
num_under_curve = sum(y <= f(x) for x, y in points)
return num_under_curve
And now, the code executes - on average - in 100 milliseconds.
Testing
Now, we might want to cover the code with unit tests. We encounter two problems that make it harder
- The code is not deterministic - several runs of the same function with the same parameters yield different results.
- The code spawns new processes and executes some code in them.
Let's cover the problems one by one
Non-determinism
Since the Monte Carlo simulation only approximates the integral, we cannot be sure of the value it will return,
so we cannot just easily come up with some constant C to satisfy assertion like
assert mc_integrate(f, a, b, y_min, y_max) == C
Why?
The function uses randomly generated points for the simuation. For example, let's try calculating
The exact answer is
def f(x):
return x*x
S1 = mc_integrate(f, 0, 1, 0, 1)
S2 = mc_integrate(f, 0, 1, 0, 1)
S1 = 0.33325338
S2 = 0.33324428
However, we can validate the correctness of the calculation by always forcing the random generator to return the same series of numbers - this is called seeding.
Let's have a look at this piece of code:
import random
random.seed(42)
print([random.randint(0, 100) for _ in range(10)])
And let's run it twice
[81, 14, 3, 94, 35, 31, 28, 17, 94, 13]
[81, 14, 3, 94, 35, 31, 28, 17, 94, 13]
As you can see, the seed forced the generator to return the same sequence of random numbers. Let's try doing the same with the Monte Carlo simulation
import random
random.seed(42)
S1 = mc_integrate(f, 0, 1, 0, 1)
S2 = mc_integrate(f, 0, 1, 0, 1)
And let's check
S1 = 0.33346894
S2 = 0.33283536
It did not work. Why?
Because the code is parallel. We are generating the same sequence of numbers, but we might never be sure at which point certain number is generated - and on which process. The random generator just yields a sequence of numbers. Let's say we only want to generate four numbers. With our seed, they are as follows: 1, 2, 3, 4. We want to create two points from these numbers. We might get (1,2) and (3,4) as well as (1,4) and (2, 3). Sequence is the same, but the place at which they land - different. We need to inject a separate seed for each process.
Solution
Let's say we are running our simulation on \(N\) processes. Then, we need a deterministic set of \(N\) seeds. Of course, we could use one seed and set it for all subprocesses, but then all processes would perform the same calculations - we would be compromising on the testing of the way they cooperate towards the final solution.
Let me share the updated code:
import random
import os
import multiprocessing
def _get_seed(i, start_seed):
if start_seed is None:
return os.urandom(16)
return start_seed + i
def mc_integrate(f, a, b, y_min, y_max, start_seed=None):
num_tests = 50_000_000
num_cpus = multiprocessing.cpu_count()
tests_per_process = num_tests // num_cpus
rect_area = (b-a) * (y_max - y_min)
with multiprocessing.Pool(num_cpus) as pool:
params = [
(tests_per_process, f, a, b, y_min, y_max, _get_seed(i, start_seed))
for i in range(num_cpus)
]
num_under_curve_per_thread = pool.starmap(perform_tests, params)
total_num_under_curve = sum(num_under_curve_per_thread)
return total_num_under_curve / num_tests * rect_area
def perform_tests(num_tests, f, a, b, y_min, y_max, seed):
random.seed(seed)
points = ((random.uniform(a, b), random.uniform(y_min, y_max)) for _ in range(num_tests))
num_under_curve = sum(y <= f(x) for x, y in points)
return num_under_curve
Now, specifying no start_seed will cause the function to behave as previously. However, once we specify the start_seed, we will be getting deterministic results.
0.33339372
0.33339372
Parallellism
Old school developers would argue that unit tests should not spawn subprocesses because they are hard to debug and can cause events like deadlocks that can carry over into subsequent tests, causing failure of unrelated parts of the test suite. Common approaches include:
- Mocking out the
threadingmodule and only verifying that correct functions were called - Substituting the concurrent implementation with a synchronous one for testing
Both of these allow us to attain necessary code coverage, however, neither of them tests the actual cooperation between processes.
I can suggest a different approach - let's just run the concurrent code in the unit tests, it doesn't really cause too much harm, as long as we understnad the problems. Let's consider the problems
Deadlocks
Parallel code can block out completely if two processes or threads are waiting on each other. However, we do not have this problem in our case as the processes are completely independent
Resource usage
Processes are heavy both on memory and CPU. We need to understand that the machine on which the unit tests are executed (the CI/CD runner, most likely) is usually way less powerful than your working station. We might cause problems like out of memory error or CPU throttling by spawning too many processes
Performance issues
As described before, the test runner is probably significantly less capable in terms of computational power than our working machine. In our case, \(50,000,000\) monte carlo tests might be good enough for the production deployment on a machine with many CPUs, but it might be too heavy for the test runner
Solution
Let's rework the original code to get the value of num_tests and num_cpus from a function call.
def _get_seed(i, start_seed):
if start_seed is None:
return os.urandom(16)
return start_seed + i
def _get_num_cpus():
return multiprocessing.cpu_count()
def _get_num_tests():
return 50_000_000
def mc_integrate(f, a, b, y_min, y_max, start_seed=None):
num_tests = _get_num_tests()
num_cpus = _get_num_cpus()
tests_per_process = num_tests // num_cpus
rect_area = (b-a) * (y_max - y_min)
with multiprocessing.Pool(num_cpus) as pool:
params = [
(tests_per_process, f, a, b, y_min, y_max, _get_seed(i, start_seed))
for i in range(num_cpus)
]
num_under_curve_per_thread = pool.starmap(perform_tests, params)
total_num_under_curve = sum(num_under_curve_per_thread)
return total_num_under_curve / num_tests * rect_area
def perform_tests(num_tests, f, a, b, y_min, y_max, seed):
random.seed(seed)
points = ((random.uniform(a, b), random.uniform(y_min, y_max)) for _ in range(num_tests))
num_under_curve = sum(y <= f(x) for x, y in points)
return num_under_curve
Now, given that our code is already deterministic, we can run a minimal test that actually tests the complete Monte Carlo simulation with minimal performance overhead
from montecarlo import mc_integrate
def f(x):
return x*x
def stub_get_num_cpus():
return 2
def stub_get_num_tests():
return 10_000
def test_mc_integrate(mocker):
mocker.patch('montecarlo._get_num_cpus', stub_get_num_cpus)
mocker.patch('montecarlo._get_num_tests', stub_get_num_tests)
mc_integrate(f, 0, 1, 0, 1, start_seed=10) == 0.3297
That's it. Now, the test is blazingly fast, efficient in terms of memory usage and deterministic - it will pass all the time, as long as the algorithm remains in tact.
Summary
Testing of concurrent and non deterministic code is often complex and requires creativity. However, taking time and coming up with a good solution will benefit you in the long run. Having clear and deterministic tests enable you to put higher confidence in your code and serve as a good documentation. Not mocking out multithreading completely ensures that your code is tested end to end.