- Setup
- How to profile code
- Look for “slow” sections
- Making improvements
- Single node parallel
- Multiple cpus, 1 computer
- Multi node parallel
- Multiple cpus, multiple computers
2024-03-01
Get presentation materials
git clone https://github.com/uschpc/workshop-hpc-python.git
module purge module load conda eval "$(conda shell.bash hook)"
conda env create -f environment.yml
examples/write.pypython -m cProfile -s tottime write.py -n 100
python -m cProfile -o write.log examples/write.py -n 100
snakeviz write.loggenerate_data called 1000000 times
npyio.py called 100 times
This snippet is not great
def write_data(x,y,n,t):
filename=("output/%s%05d" %(output,i))
i_max=len(x)
j_max=len(y)
data=np.zeros((i_max,j_max))
for i in range(i_max):
for j in range(j_max):
data[i,j]=generate_data(x[i],y[j],nFiles,fileID)
np.savetxt(filename,data)
where generate_data() is defined as:
def generate_data(x,y,n,t):
value = (x/10)**2*(y/10) + (y/10)**2*(x/10)+t/n
value = value %1 # Keep number between 0 and 1
return value
n - number files to be generatedt - which file to generatenumpy.meshgrid(*xi, copy=True, sparse=False, indexing='xy')
Return coordinate matrices from coordinate vectors. Make N-D coordinate arrays for vectorized evaluations of N-D scalar/vector fields over N-D grids, given one-dimensional coordinate arrays x1, x2,…, xn.
def write_data(X,Y,n,t):
filename=("output/%s%05d" %(output,i))
Z = generate_data(X,Y,nFiles,i)
np.savetxt(filename,Z)
Where
x = np.arange(x_origin-size/2,x_origin+size/2,1) y = np.arange(y_origin-size/2,y_origin+size/2,1) X,Y = np.meshgrid(x,y)
examples/write_vectorized.pypython3 -m cProfile -s tottime examples/write_vectorized.py -n 100 write_vectorized > write_vectorized
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 100 | 0.870 | 0.009 | 1.145 | 0.011 | npyio.py:1191(savetxt) |
| 200 | 0.160 | 0.001 | 0.162 | 0.001 | {built-in method io.open} |
| 115 | 0.075 | 0.001 | 0.075 | 0.001 | {built-in method io.open_code} |
| 34/32 | 0.054 | 0.002 | 0.058 | 0.002 | {built-in method _imp.create_dynamic} |
| 10000 | 0.051 | 0.000 | 0.051 | 0.000 | {method ‘write’ of ’_io.TextIOWrapper’ objects} |
| 100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized.py:6(generate_data) |
Original write.py ~4.8s
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 1000000 | 2.854 | 0.000 | 2.854 | 0.000 | write.py:6(generate_data) |
| 100 | 0.749 | 0.007 | 1.011 | 0.010 | npyio.py:1191(savetxt) |
| 100 | 0.673 | 0.007 | 4.540 | 0.045 | write.py:11(write_data) |
New write_vectorized.py ~ 1.4s
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 100 | 0.870 | 0.009 | 1.145 | 0.011 | npyio.py:1191(savetxt) |
| 100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized.py:6(generate_data) |
| 100 | 0.001 | 0.000 | 1.191 | 0.012 | write_vectorized.py:11(write_data) |
np.savetxt(filename,data)
np.save(filename,data)
write_vectorized.py ~1.4s
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 100 | 0.870 | 0.009 | 1.145 | 0.011 | npyio.py:1191(savetxt) |
| 100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized.py:6(generate_data) |
| 100 | 0.001 | 0.000 | 1.191 | 0.012 | write_vectorized.py:11(write_data) |
examples/write_vectorized_binary.py ~0.46s
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 100 | 0.174 | 0.002 | 0.174 | 0.002 | {built-in method io.open} |
| 100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized_binary.py:6(generate_data) |
| 100 | 0.001 | 0.000 | 0.220 | 0.002 | write_vectorized_binary.py:11(write_data) |
| 100 | 0.002 | 0.000 | 0.265 | 0.003 | npyio.py:457(save) |
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
with mp.Pool(processes=nWorkers) as pool:
for i in range(0,nFiles):
pool.apply(write_data,args=(X,Y,output,nFiles,i,))
-w option for nWorkersexamples/write_multiprocessing.pysinfo2 in shell to see node types--cpus-per-task)#!/bin/bash #SBATCH --nodes=1 #SBATCH --ntasks=1 #SBATCH --cpus-per-task=8 #SBATCH --mem=16GB #SBATCH --time=1:00:00 #SBATCH --account=<account_id> module purge module load conda eval "$(conda shell.bash hook)" conda activate hpc-python python3 /path/to/script.py
python3 write_multiprocessing.py -w 3 -n 100examples/job-scripts/multicore.jobviztracer -o multiprocessing.json examples/write_multiprocessing.py -w 3 -n 100vizviewer multiprocessig.json| ncalls | tottime | tottime/3 | function |
|---|---|---|---|
| 1000 | 3.1 | 1.6 | io.open |
| 1001 | 0.9 | 0.3 | recv_bytes |
| 1000 | 0.48 | 0.16 | generate_data |
| ncalls | tottime | tottime (per worker) | function |
|---|---|---|---|
| 2003 | 8.2 | 2.7 | enter |
| 1000 | 3.6 | 1.9 | io.open |
| 1003 | 7.2 | 2.5 | recv_bytes |
| 1000 | 0.6 | .2 | generate_data |
pool.apply(write_data,args=(X,Y,output,nFiles,i))
X,Y,output,nFiles even though they never changeexamples/write_multiprocessing_queue.py
class worker(mp.Process):
## Initialize static proccess data
def __init__(self,task_queue,size,
output,nFiles,**kwargs):
super(worker,self).__init__()
x_origin=0
y_origin=500
x = np.arange(x_origin-size/2,x_origin+size/2,1)
y = np.arange(y_origin-size/2,y_origin+size/2,1)
self.X,self.Y = np.meshgrid(x,y)
self.nFiles=nFiles
self.output=output
# Where to get work from
self.task_queue=task_queue
# Define work each process does
def run(self):
print("Starting Process:%d " % self.pid)
time.sleep(1)
while True:
try:
i = self.task_queue.get(timeout=1)
except q.Empty:
print("No more work to do")
#self.terminate()
break
write_data(self.X,self.Y,self.output,self.nFiles,i)
self.task_queue.task_done()
returnexamples/write_multiprocessing_queue.py
task_queue = mp.JoinableQueue()
# Create work
for i in range(0,nFiles):
task_queue.put(i)
# start workers
for i in range(0,nWorkers):
w=worker(task_queue,size,output,nFiles)
w.start()
workers.append(w)
print("\nWaiting for work to complete...\n")
task_queue.join()
for w in workers:
print(f"Closing worker {w}")
w.join()| ncalls | tottime | tottime/3 | function |
|---|---|---|---|
| 1000 | 2.3 | 0.8 | io.open |
| 1000 | 0.5 | 0.17 | generate_data |
| 1000 | 0.03 | 0.01 | recv_bytes |
| ncalls | tottime | tottime (per worker) | function |
|---|---|---|---|
| 1003 | 2.4 | 0.8 | io.open |
| 1000 | 0.5 | 0.17 | generate_data |
| 1000 | 0.03 | 0.01 | recv_byte |
| 2001 | 0.001 | ~0 | __enter__ |
examples/write_multiprocessing.py 3 workers, ~7.6 s| ncalls | tottime | tottime (per worker) | function |
|---|---|---|---|
| 2003 | 8.2 | 2.7 | __enter__ |
| 1003 | 7.2 | 2.5 | recv_bytes |
| 1000 | 3.6 | 1.9 | io.open |
| 1000 | 0.6 | .2 | generate_data |
examples/write_multiprocessing_queue.py 3 workers, ~2.8 s| ncalls | tottime | tottime (per worker) | function |
|---|---|---|---|
| 2001 | 0.001 | ~0 | __enter__ |
| 1000 | 0.03 | 0.01 | recv_byte |
| 1003 | 2.4 | 0.8 | io.open |
| 1000 | 0.5 | 0.17 | generate_data |
examples/write_mpi.pyif rank == 0:
# Summarize params
print("My names is rank %d"%rank+ " and I'm starting...")
print('world_size= %s' %world_size)
print('nFiles=%s' %nFiles)
print('output_template=%s%%05d.txt ' %output)
print('size=%d' %size)
recipient=1
data_chunks=np.array_split(data,n_chunks)
#print(data_chunks)
for chunk in data_chunks[:-1]:
comm.Send([chunk,MPI.INT], dest=recipient, tag=77)
recipient +=1
local_data=data_chunks[n_chunks-1]
else:
local_data=np.empty(int(nFiles/n_chunks),dtype='i')
comm.Recv([local_data,MPI.INT],source=0, tag=77)
for i in local_data:
write_data(X,Y,output,nFiles,i)
#!/bin/bash #SBATCH --ntasks=16 #SBATCH --cpus-per-task=1 #SBATCH --mem-per-cpu=3GB #SBATCH --time=1:00:00 #SBATCH --account=<account_id> module purge module load gcc/8.3.0 module load openblas/0.3.8 module load openmpi/4.0.2 module load pmix/3.1.3 module load python/3.7.6 srun --mpi=pmix_v2 python3 /path/to/script.py
srun --mpi=pmix_v2 python3 write_mpi.py -n 100examples/job-scripts/mpi.jobcomm.Send and com.Recv we can create our own work load managercProflie and pdb againmpi4py has a few convenience functions for common taskscomm.Scatter will share data to every workerprint("My names is rank %d"%rank+ " and I'm starting...")
chunks=None
if rank == 0:
# Send pieces of data from rank 0 to whole world
data = np.arange(nFiles,dtype='i')
chunks = data.reshape((n_chunks,int(nFiles/n_chunks)))
recvbuf = np.empty(int(nFiles/n_chunks),dtype='i')
comm.Scatter(chunks,recvbuf,root=0)
for i in recvbuf:
#print("writing ...", i)
write_data(X,Y,output,nFiles,i)examples/write_vectorized_binary.py we can modifying write_data()examples/write_hdf5.pydef write_data(X,Y,output,nFiles,i,hf):
filename=("output/%s%05d" %(output,i))
Z = generate_data(X,Y,nFiles,i)
hf.create_dataset(filename,data=Z)
hf = h5py.File('output/data.h5', 'w')examples/write_vectorized_binary.py ~5.46s (1000 writes)
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 1000 | 1.098 | 0.001 | 1.098 | 0.001 | {built-in method io.open} |
| 1000 | 0.860 | 0.001 | 0.860 | 0.001 | write_vectorized_binary.py:6(generate_data) |
| 1000 | 0.517 | 0.001 | 0.518 | 0.001 | {method ‘tofile’ of ‘numpy.ndarray’ objects} |
| 637 | 0.451 | 0.001 | 0.451 | 0.001 | {built-in method posix.stat} |
examples/write_hdf5.py ~2.54s (1000 writes)
| ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
|---|---|---|---|---|---|
| 1000 | 0.873 | 0.001 | 0.873 | 0.001 | write_hdf5.py:7(generate_data) |
| 778 | 0.235 | 0.000 | 0.235 | 0.000 | {built-in method posix.stat} |
| 1000 | 0.211 | 0.000 | 0.242 | 0.000 | dataset.py:38(make_new_dset) |
examples/write_final.pyhf=h5py.File('output/data.hdf5', 'w',driver='mpio',comm=comm)write_data function as single core version