Intro. to Parallel Computing

Software Development for Engineering Research


Kyle Niemeyer. 17 February 2022

ME 599, Corvallis, OR

Today: Introduction to parallel computing (in Python)


If you want more, check out CS 475/575 Parallel Programming.

Approaching parallelism:


  • How do I do many things at once?
  • How do I do one thing faster?

Computer: perform many simultaneous tasks

Developer: determine dependencies between code and data

Up until now: serial thinking, like "x then y then z"


Benefit of parallelism: problems execute faster—sometimes much faster.

Downside to parallelism: harder to program, debug, open files, print to screen.

Why go parallel?


First: wait until your code works and you need it.


  • The problem creates or requires too much data for a normal machine.
  • The sun would explode before the computation would finish.
  • The algorithm is easy to parallelize.
  • The physics itself cannot be simulated at all with smaller resources.

Scale and Scalability


Scale and Scalability 'book' image

Scale and Scalability


Scale: size of problem

Scale is proportional to number of processes P used, and thus the maximum degree of parallelism possible.

FLOPS: number of floating-point operations per second

Scaling up code


Scale up slowly—start with one processor, then 10, 100, etc.

scalability: how easy or hard it is to scale code

strong scaling: how runtime changes as a function of processor number for a fixed total problem size

speedup \(s\): ratio of time on one processor, \(t_1\), to time on \(P\) processors, \(t_P\):

\[\begin{equation} s(P) = \frac{t_1}{t_P} \end{equation} \]

Strong scaling


Efficient system: linear strong scaling speedup. For example, PyFR CFD code:

pyFR strong scaling performance
Ref: P Vincent et al. "PyFR: Next-Generation High-Order Computational Fluid Dynamics on Many-Core Hardware." 22nd AIAA CFD Conference. 2015.

weak scaling: how runtime changes as a function of processor number for a fixed problem size per processor

sizeup \(z\), for a problem size \(N\):

\[\begin{equation} z(P) = \frac{t_1}{t_P} \times \frac{N_P}{N_1} \end{equation} \]

Weak scaling

Efficient system: linear sizeup. Or, constant time with additional processors/problem size. For example, PyFR CFD code:

pyFR weak scaling performance
Ref: P Vincent et al. "PyFR: Next-Generation High-Order Computational Fluid Dynamics on Many-Core Hardware." 22nd AIAA CFD Conference. 2015.

Amdahl's Law

Some fraction of an algorithm, \(\alpha\), cannot be parallelized. Then, the maximum speedup/sizeup for \(P\) processors is:

\[\begin{equation} \max (s(P)) = \frac{1}{\alpha - \frac{1 - \alpha}{P}} \end{equation} \]

Then, the theoretically max speedup is:

\[\begin{equation} \max (s) = \frac{1}{\alpha} \end{equation} \]

Types of problems


embarassingly parallel: algorithms with a high degree of independence and little communication between parts

Examples: summing large arrays, matrix multiplication, Monte Carlo simulations, some optimization approaches (e.g., stochastic and genetic algorithms)

Other algorithms have unavoidable bottlenecks: inverting a matrix, ODE integration, etc.

Not all hope is lost! Some parts may still benefit from parallelization.

Example: N-body problem


generalization of the classic 2-body problem that governs the equations of motion for two masses. From Newton's law of gravity:

\[\begin{equation} \frac{dp}{dt} = G \frac{m_1 m_2}{r^2} \end{equation} \]

Equations of motion:


\[\begin{aligned} \mathbf{x}_{i,s} & = G \sum_{j=1, i \neq j} \frac{m_j (\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1})}{||\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1}||^3} \Delta t^2 + \mathbf{v}_{i,s-1} \Delta t + \mathbf{x}_{i,s-1} \\ \mathbf{v}_{i,s} & = G \sum_{j=1, i \neq j} \frac{m_j (\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1})}{||\mathbf{x}_{j,s-1} - \mathbf{x}_{i,s-1}||^3} \Delta t + \mathbf{v}_{i,s-1} \end{aligned} \]

No parallelism


                            import numpy as np

                            def remove_i(x, i):
                                """Drops the ith element of an array."""
                                shape = (x.shape[0]-1,) + x.shape[1:]
                                y = np.empty(shape, dtype=float)
                                y[:i] = x[:i]
                                y[i:] = x[i+1:]
                                return y

                            def a(i, x, G, m):
                                """The acceleration of the ith mass."""
                                x_i = x[i]
                                x_j = remove_i(x, i) # don't compute on itself
                                m_j = remove_i(m, i)
                                diff = x_j - x_i
                                mag3 = np.sum(diff**2, axis=1)**1.5
                                # compute acceleration on ith mass
                                result = G * np.sum(diff * (m_j / mag3)[:,np.newaxis], axis=0)
                                return result
                        

                            def timestep(x0, v0, G, m, dt):
                                """Computes the next position and velocity for all masses given
                                initial conditions and a time step size.
                                """
                                N = len(x0)
                                x1 = np.empty(x0.shape, dtype=float)
                                v1 = np.empty(v0.shape, dtype=float)
                                for i in range(N): # update locations for all masses each step
                                    a_i0 = a(i, x0, G, m)
                                    v1[i] = a_i0 * dt + v0[i]
                                    x1[i] = a_i0 * dt**2 + v0[i] * dt + x0[i]
                                return x1, v1

                            def initial_cond(N, D):
                                """Generates initial conditions for N unity masses at rest
                                starting at random positions in D-dimensional space.
                                """
                                x0 = np.random.rand(N, D) # use random initial locations
                                v0 = np.zeros((N, D), dtype=float)
                                m = np.ones(N, dtype=float)
                                return x0, v0, m
                        

                            x0, v0, m = initial_cond(10, 2)
                            x1, v1 = timestep(x0, v0, 1.0, m, 1.0e-3)
                        

Driver function that simulates \(S\) time steps:


                            def simulate(N, D, S, G, dt):
                                x0, v0, m = initial_cond(N, D)
                                for s in range(S):
                                    x1, v1 = timestep(x0, v0, G, m, dt)
                                    x0, v0 = x1, v1
                        

Measure performance:



                            import time
                            Ns = [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192]
                            runtimes = []
                            for N in Ns:
                                start = time.time()
                                simulate(N, 3, 300, 1.0, 1e-3)
                                stop = time.time()
                                runtimes.append(stop - start)
                        

Does the problem scale quadratically?

Threads


Threads perform work and are not blocked by work of other threads.

Threads can communicate with each other through state

Threads: probably don't use in Python.


All threads execute in the same process as Python itself.

Where to use? If you have high latency tasks where you can use spare time for another task (e.g., downloading a large file)

N-body with threading


                            from threading import Thread

                            class Worker(Thread):
                                """Computes x, v, and a of the ith body."""
                                def __init__(self, *args, **kwargs):
                                    super(Worker, self).__init__(*args, **kwargs)
                                    self.inputs = [] # buffer of work for thraeds
                                    self.results = [] # buffer of return values
                                    self.running = True # setting to False causes run() to end safely
                                    self.daemon = True # allow thread to be terminated with Python
                                    self.start()

                                def run(self):
                                    while self.running:
                                        if len(self.inputs) == 0: # check for work
                                            continue
                                        i, x0, v0, G, m, dt = self.inputs.pop(0)
                                        a_i0 = a(i, x0, G, m) # body of original timestep()
                                        v_i1 = a_i0 * dt + v0[i]
                                        x_i1 = a_i0 * dt**2 + v0[i] * dt + x0[i]
                                        result = (i, x_i1, v_i1)
                                        self.results.append(result)
                        

Thread pools

Expensive to create and start threads, so create pool that persists


                            class Pool(object):
                                """A collection of P worker threads that distributes tasks
                                evenly across them.
                                """
                                def __init__(self, size):
                                    self.size = size
                                    # create new workers based on size
                                    self.workers = [Worker() for p in range(size)]

                                def do(self, tasks):
                                    for p in range(self.size):
                                        self.workers[p].inputs += tasks[p::self.size] # evenly distribute tasks
                                    while any([len(worker.inputs) != 0 for worker in self.workers]):
                                        pass # wait for all workers to finish
                                    results = []
                                    for worker in self.workers: # get results back from workers
                                        results += worker.results
                                        worker.results.clear()
                                    return results # return complete list of results for all inputs

                                def __del__(self): # stop workers when pool is shut down
                                    for worker in self.workers:
                                        worker.running = False
                        

                            def timestep(x0, v0, G, m, dt, pool):
                                """Computes the next position and velocity for all masses given
                                initial conditions and a time step size.
                                """
                                N = len(x0)
                                tasks = [(i, x0, v0, G, m, dt) for i in range(N)] # create task for each body
                                results = pool.do(tasks) # run tasks
                                x1 = np.empty(x0.shape, dtype=float)
                                v1 = np.empty(v0.shape, dtype=float)
                                for i, x_i1, v_i1 in results: # rearrange results (probably not in order)
                                    x1[i] = x_i1
                                    v1[i] = v_i1
                                return x1, v1
                        

How does simulation perform as function of \(P\)?


                            Ps = [1, 2, 4, 8]
                            runtimes = []
                            for P in Ps:
                                start = time.time()
                                simulate(P, 64, 3, 300, 1.0, 1e-3)
                                stop = time.time()
                                runtimes.append(stop - start)
                        

Multiprocessing


More like threading available in other languages...

Can't be used directly from interactive interpreter; main module must be importable by fork processes

Provides Pool class for us, used with powerful map() function.


                            from multiprocessing import Pool

                            # function needs one argument
                            def timestep_i(args):
                                """Computes the next position and velocity for the ith mass."""
                                i, x0, v0, G, m, dt = args # unpack arguments to original function
                                a_i0 = a(i, x0, G, m) # body of original timestep()
                                v_i1 = a_i0 * dt + v0[i]
                                x_i1 = a_i0 * dt**2 + v0[i] * dt + x0[i]
                                return i, x_i1, v_i1
                        

                            def timestep(x0, v0, G, m, dt, pool):
                                """Computes the next position and velocity for all masses given
                                initial conditions and a time step size.
                                """
                                N = len(x0)
                                tasks = [(i, x0, v0, G, m, dt) for i in range(N)]
                                results = pool.map(timestep_i, tasks) # replace old do() with Pool.map()
                                x1 = np.empty(x0.shape, dtype=float)
                                v1 = np.empty(v0.shape, dtype=float)
                                for i, x_i1, v_i1 in results:
                                    x1[i] = x_i1
                                    v1[i] = v_i1
                                return x1, v1
                        

How does multiprocessing scale?


                            import time
                            Ps = [1, 2, 4, 8]
                            runtimes = []
                            for P in Ps:
                                start = time.time()
                                simulate(P, 256, 3, 300, 1.0, 1e-3)
                                stop = time.time()
                                runtimes.append(stop - start)
                        

MPI: Message-Passing Interface

Supercomputing is built on MPI.

MPI scales from hundreds of thousands to millions of processors.

Processes are called ranks and given integer identifiers; rank 0 is usually the "master" that controls other ranks. All about communication.

Can be used in Python and NumPy arrays with mpi4py

numba: just-in-time compiler for Python/NumPy

translates Python functions to optimized machine code at runtime using the LLVM compiler

does not require a separate compiler, just decorators

can automatically parallelize loops


                            from numba import jit
                            import random

                            @jit(nopython=True)
                            def monte_carlo_pi(nsamples):
                                acc = 0
                                for i in range(nsamples):
                                    x = random.random()
                                    y = random.random()
                                    if (x ** 2 + y ** 2) < 1.0:
                                        acc += 1
                                return 4.0 * acc / nsamples