import numpy as np
def remove_i(x, i):
"""Drops the ith element of an array."""
= (x.shape[0]-1,) + x.shape[1:]
shape = np.empty(shape, dtype=float)
y = x[:i]
y[:i] = x[i+1:]
y[i:] return y
def a(i, x, G, m):
"""The acceleration of the ith mass."""
= x[i]
x_i = remove_i(x, i) # don't compute on itself
x_j = remove_i(m, i)
m_j = x_j - x_i
diff = np.sum(diff**2, axis=1)**1.5
mag3 # compute acceleration on ith mass
= G * np.sum(diff * (m_j / mag3)[:,np.newaxis], axis=0)
result return result
Introduction to parallel computing
Introduction to parallel computing (in Python)
If you want more, check out CS 475/575 Parallel Programming.
Approaching parallelism
- How do I do many things at once?
- How do I do one thing faster?
Computer: perform many simultaneous tasks
Developer: determine dependencies between code and data
Up until now: serial thinking, like “x then y then z”
Benefit of parallelism: problems execute faster—sometimes much faster.
Downside to parallelism: harder to program, debug, open files, print to screen.
Why go parallel?
First: wait until your code works and you need it.
- The problem creates or requires too much data for a normal machine.
- The sun would explode before the computation would finish.
- The algorithm is easy to parallelize.
- The physics itself cannot be simulated at all with smaller resources.
Scale and scalability
Scale and scalability
Scale: size of problem
Scale is proportional to number of processes \(P\) used, and thus the maximum degree of parallelism possible.
FLOPS: number of floating-point operations per second
Scaling up code
Scale up slowly—start with one processor, then 10, 100, etc.
scalability: how easy or hard it is to scale code
Strong scaling
strong scaling: how runtime changes as a function of processor number for a fixed total problem size.
speedup \(s\) ratio of time on one processor (\(t_1\)) to time on \(P\) processors (\(t_P\)):
\[ s(P) = \frac{t_1}{t_P} \]
Strong scaling
Efficient system: linear strong scaling speedup.
For example, PyFR computational fluid dynamics code:
Weak scaling
weak scaling: how runtime changes as a function of processor number for a fixed problem size per processor
sizeup \(z\) for a problem size \(N\):
\[ z(P) = \frac{t_1}{t_P} \times \frac{N_P}{N_1} \]
Weak scaling
Efficient system: linear sizeup. Or, constant time with additional processors/problem size.
For example, PyFR computational fluid dynamics code:
Amdahl’s Law
Some fraction of an algorithm, \(α\), cannot be parallelized.
Then, the maximum speedup/sizeup for \(P\) processors is:
\[ \max( s(P) ) = \frac{1}{\alpha - \frac{1-\alpha}{P}} \]
The theoretically max speedup is:
\[ \max(s) = \frac{1}{\alpha} \]
Types of problems
embarassingly parallel: algorithms with a high degree of independence and little communication between parts
Examples: summing large arrays, matrix multiplication, Monte Carlo simulations, some optimization approaches (e.g., stochastic and genetic algorithms)
Other algorithms have unavoidable bottlenecks: inverting a matrix, ODE integration, etc.
Not all hope is lost! Some parts may still benefit from parallelization.
Example: N-body problem
Generalization of the classic two-body problem that governs the equations of motion for two masses. From Newton’s law of gravity:
\[ \frac{dp}{dt} = G \frac{m_1 m_2}{r^2} \]
Equations of motion
\[ \begin{align} \mathbf{x}_{i,s} &= G \sum_{j=1, i \neq j} \frac{m_j (\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1})}{||\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1}||^3} \Delta t^2 + \mathbf{v}_{i,s-1} \Delta t + \mathbf{x}_{i,s-1} \\ \mathbf{v}_{i,s} &= G \sum_{j=1, i \neq j} \frac{m_j (\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1})}{||\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1}||^3} \Delta t + \mathbf{v}_{i,s-1} \end{align} \]
No parallelism
def timestep(x0, v0, G, m, dt):
"""Computes the next position and velocity for all masses given
initial conditions and a time step size.
"""
= len(x0)
N = np.empty(x0.shape, dtype=float)
x1 = np.empty(v0.shape, dtype=float)
v1 for i in range(N): # update locations for all masses each step
= a(i, x0, G, m)
a_i0 = a_i0 * dt + v0[i]
v1[i] = a_i0 * dt**2 + v0[i] * dt + x0[i]
x1[i] return x1, v1
def initial_cond(N, D):
"""Generates initial conditions for N unity masses at rest
starting at random positions in D-dimensional space.
"""
= np.random.rand(N, D) # use random initial locations
x0 = np.zeros((N, D), dtype=float)
v0 = np.ones(N, dtype=float)
m return x0, v0, m
Generating initial conditions and taking one timestep:
= initial_cond(10, 2)
x0, v0, m = timestep(x0, v0, 1.0, m, 1.0e-3) x1, v1
Driver function that simulates \(S\) time steps:
def simulate(N, D, S, G, dt):
= initial_cond(N, D)
x0, v0, m for s in range(S):
= timestep(x0, v0, G, m, dt)
x1, v1 = x1, v1 x0, v0
Measuring performance
import time
= [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096]
Ns = []
runtimes for N in Ns:
= time.time()
start 3, 300, 1.0, 1e-3)
simulate(N, = time.time()
stop - start) runtimes.append(stop
import matplotlib.pyplot as plt
'o')
plt.plot(Ns, runtimes, True)
plt.grid( plt.show()
Does the problem scale quadratically?
Threads
- Threads perform work and are not blocked by work of other threads.
- Threads can communicate with each other through state
Threads: probably don’t use in Python.
- All threads execute in the same process as Python itself.
- Where to use? If you have high latency tasks where you can use spare time for another task (e.g., downloading a large file)
N-body with threading
from threading import Thread
class Worker(Thread):
"""Computes x, v, and a of the ith body."""
def __init__(self, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.inputs = [] # buffer of work for thraeds
self.results = [] # buffer of return values
self.running = True # setting to False causes run() to end safely
self.daemon = True # allow thread to be terminated with Python
self.start()
def run(self):
while self.running:
if len(self.inputs) == 0: # check for work
continue
= self.inputs.pop(0)
i, x0, v0, G, m, dt = a(i, x0, G, m) # body of original timestep()
a_i0 = a_i0 * dt + v0[i]
v_i1 = a_i0 * dt**2 + v0[i] * dt + x0[i]
x_i1 = (i, x_i1, v_i1)
result self.results.append(result)
Thread pools
Expensive to create and start threads, so create pool that persists.
class Pool(object):
"""A collection of P worker threads that distributes tasks
evenly across them.
"""
def __init__(self, size):
self.size = size
# create new workers based on size
self.workers = [Worker() for p in range(size)]
def do(self, tasks):
for p in range(self.size):
self.workers[p].inputs += tasks[p::self.size] # evenly distribute tasks
while any([len(worker.inputs) != 0 for worker in self.workers]):
pass # wait for all workers to finish
= []
results for worker in self.workers: # get results back from workers
+= worker.results
results
worker.results.clear()return results # return complete list of results for all inputs
def __del__(self): # stop workers when pool is shut down
for worker in self.workers:
= False worker.running
def timestep(x0, v0, G, m, dt, pool):
"""Computes the next position and velocity for all masses given
initial conditions and a time step size.
"""
= len(x0)
N = [(i, x0, v0, G, m, dt) for i in range(N)] # create task for each body
tasks = pool.do(tasks) # run tasks
results = np.empty(x0.shape, dtype=float)
x1 = np.empty(v0.shape, dtype=float)
v1 for i, x_i1, v_i1 in results: # rearrange results (probably not in order)
= x_i1
x1[i] = v_i1
v1[i] return x1, v1
def simulate(P, N, D, S, G, dt):
= initial_cond(N, D)
x0, v0, m = Pool(P)
pool for s in range(S):
= timestep(x0, v0, G, m, dt, pool)
x1, v1 = x1, v1 x0, v0
How does simulation perform as function of \(P\)?
import time
= [1, 2, 4, 8]
Ps = []
runtimes for P in Ps:
= time.time()
start 64, 3, 300, 1.0, 1e-3)
simulate(P, = time.time()
stop - start) runtimes.append(stop
import matplotlib.pyplot as plt
'o')
plt.plot(Ps, runtimes, 'Number of threads')
plt.xlabel('Runtime (s)')
plt.ylabel(True)
plt.grid( plt.show()
Multiprocessing
More like threading available in other languages…
Can’t be used directly from interactive interpreter; main module must be importable by fork processes
Provides Pool
class for us, used with powerful map()
function.
import numpy as np
from multiprocessing import Pool
def remove_i(x, i):
"""Drops the ith element of an array."""
= (x.shape[0]-1,) + x.shape[1:]
shape = np.empty(shape, dtype=float)
y = x[:i]
y[:i] = x[i+1:]
y[i:] return y
def a(i, x, G, m):
"""The acceleration of the ith mass."""
= x[i]
x_i = remove_i(x, i) # don't compute on itself
x_j = remove_i(m, i)
m_j = x_j - x_i
diff = np.sum(diff**2, axis=1)**1.5
mag3 # compute acceleration on ith mass
= G * np.sum(diff * (m_j / mag3)[:,np.newaxis], axis=0)
result return result
# function needs one argument
def timestep_i(args):
"""Worker function that computes the next position and velocity for the ith mass."""
= args # unpack arguments to original function
i, x0, v0, G, m, dt = a(i, x0, G, m) # body of original timestep()
a_i0 = a_i0 * dt + v0[i]
v_i1 = a_i0 * dt**2 + v0[i] * dt + x0[i]
x_i1 return i, x_i1, v_i1
def timestep(x0, v0, G, m, dt, pool):
"""Computes the next position and velocity for all masses given
initial conditions and a time step size.
"""
= len(x0)
N = [(i, x0, v0, G, m, dt) for i in range(N)]
tasks = pool.map(timestep_i, tasks) # replace old do() with Pool.map()
results = np.empty(x0.shape, dtype=float)
x1 = np.empty(v0.shape, dtype=float)
v1 for i, x_i1, v_i1 in results:
= x_i1
x1[i] = v_i1
v1[i] return x1, v1
def initial_cond(N, D):
"""Generates initial conditions for N unity masses at rest
starting at random positions in D-dimensional space.
"""
= np.random.rand(N, D) # use random initial locations
x0 = np.zeros((N, D), dtype=float)
v0 = np.ones(N, dtype=float)
m return x0, v0, m
def simulate(P, N, D, S, G, dt):
= initial_cond(N, D)
x0, v0, m = Pool(P)
pool for s in range(S):
= timestep(x0, v0, G, m, dt, pool)
x1, v1 = x1, v1 x0, v0
How does multiprocessing
scale?
import numpy as np
import time
from nbody_multiprocessing import simulate
= np.array([1, 2, 4, 6, 8, 10, 12, 14, 16])
Ps = []
runtimes for P in Ps:
= time.time()
start 512, 3, 300, 1.0, 1e-3)
simulate(P, = time.time()
stop - start) runtimes.append(stop
import matplotlib.pyplot as plt
'o', label='Measured runtime')
plt.plot(Ps, runtimes, 0] / Ps, label='Ideal scaling')
plt.plot(Ps, runtimes['Number of processes')
plt.xlabel('Runtime (s)')
plt.ylabel(
plt.legend()True)
plt.grid( plt.show()
Other options for parallelism
MPI: Message-Passing Interface
Supercomputing is built on MPI.
MPI scales from hundreds of thousands to millions of processors.
Processes are called ranks and given integer identifiers; rank 0 is usually the “main” that controls other ranks. All about communication.
Can be used in Python and NumPy arrays with mpi4py
Numba
Numba is a just-in-time compiler for Python/NumPy
It translates Python functions to optimized machine code at runtime using the LLVM compiler—but does not require a separate compiler, just decorators
can automatically parallelize loops
from numba import jit
import random
@jit(nopython=True)
def monte_carlo_pi(nsamples):
= 0
acc for i in range(nsamples):
= random.random()
x = random.random()
y if (x ** 2 + y ** 2) < 1.0:
+= 1
acc return 4.0 * acc / nsamples
Takeaways
If your problem calls for it, consider parallelizing.
That said, do not jump here, when implementing smart NumPy array operations, appropriate data structures, or existing solutions may suffice.