- Setup
- How to profile code
- Look for “slow” sections
- Making improvements
- Single node parallel
- Multiple cpus, 1 computer
- Multi node parallel
- Multiple cpus, multiple computers
2024-03-01
Get presentation materials
git clone https://github.com/uschpc/workshop-hpc-python.git
module purge module load conda eval "$(conda shell.bash hook)"
conda env create -f environment.yml
examples/write.py
python -m cProfile -s tottime write.py -n 100
python -m cProfile -o write.log examples/write.py -n 100
snakeviz write.log
generate_data
called 1000000 times
npyio.py
called 100 times
This snippet is not great
def write_data(x,y,n,t): filename=("output/%s%05d" %(output,i)) i_max=len(x) j_max=len(y) data=np.zeros((i_max,j_max)) for i in range(i_max): for j in range(j_max): data[i,j]=generate_data(x[i],y[j],nFiles,fileID) np.savetxt(filename,data)
where generate_data()
is defined as:
def generate_data(x,y,n,t): value = (x/10)**2*(y/10) + (y/10)**2*(x/10)+t/n value = value %1 # Keep number between 0 and 1 return value
n
- number files to be generatedt
- which file to generatenumpy.meshgrid(*xi, copy=True, sparse=False, indexing='xy')
Return coordinate matrices from coordinate vectors. Make N-D coordinate arrays for vectorized evaluations of N-D scalar/vector fields over N-D grids, given one-dimensional coordinate arrays x1, x2,…, xn.
def write_data(X,Y,n,t): filename=("output/%s%05d" %(output,i)) Z = generate_data(X,Y,nFiles,i) np.savetxt(filename,Z)
Where
x = np.arange(x_origin-size/2,x_origin+size/2,1) y = np.arange(y_origin-size/2,y_origin+size/2,1) X,Y = np.meshgrid(x,y)
examples/write_vectorized.py
python3 -m cProfile -s tottime examples/write_vectorized.py -n 100 write_vectorized > write_vectorized
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
100 | 0.870 | 0.009 | 1.145 | 0.011 | npyio.py:1191(savetxt) |
200 | 0.160 | 0.001 | 0.162 | 0.001 | {built-in method io.open} |
115 | 0.075 | 0.001 | 0.075 | 0.001 | {built-in method io.open_code} |
34/32 | 0.054 | 0.002 | 0.058 | 0.002 | {built-in method _imp.create_dynamic} |
10000 | 0.051 | 0.000 | 0.051 | 0.000 | {method ‘write’ of ’_io.TextIOWrapper’ objects} |
100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized.py:6(generate_data) |
Original write.py
~4.8s
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
1000000 | 2.854 | 0.000 | 2.854 | 0.000 | write.py:6(generate_data) |
100 | 0.749 | 0.007 | 1.011 | 0.010 | npyio.py:1191(savetxt) |
100 | 0.673 | 0.007 | 4.540 | 0.045 | write.py:11(write_data) |
New write_vectorized.py
~ 1.4s
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
100 | 0.870 | 0.009 | 1.145 | 0.011 | npyio.py:1191(savetxt) |
100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized.py:6(generate_data) |
100 | 0.001 | 0.000 | 1.191 | 0.012 | write_vectorized.py:11(write_data) |
np.savetxt(filename,data)
np.save(filename,data)
write_vectorized.py
~1.4s
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
100 | 0.870 | 0.009 | 1.145 | 0.011 | npyio.py:1191(savetxt) |
100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized.py:6(generate_data) |
100 | 0.001 | 0.000 | 1.191 | 0.012 | write_vectorized.py:11(write_data) |
examples/write_vectorized_binary.py
~0.46s
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
100 | 0.174 | 0.002 | 0.174 | 0.002 | {built-in method io.open} |
100 | 0.044 | 0.000 | 0.044 | 0.000 | write_vectorized_binary.py:6(generate_data) |
100 | 0.001 | 0.000 | 0.220 | 0.002 | write_vectorized_binary.py:11(write_data) |
100 | 0.002 | 0.000 | 0.265 | 0.003 | npyio.py:457(save) |
def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3]))
with mp.Pool(processes=nWorkers) as pool: for i in range(0,nFiles): pool.apply(write_data,args=(X,Y,output,nFiles,i,))
-w
option for nWorkers
examples/write_multiprocessing.py
sinfo2
in shell to see node types--cpus-per-task
)#!/bin/bash #SBATCH --nodes=1 #SBATCH --ntasks=1 #SBATCH --cpus-per-task=8 #SBATCH --mem=16GB #SBATCH --time=1:00:00 #SBATCH --account=<account_id> module purge module load conda eval "$(conda shell.bash hook)" conda activate hpc-python python3 /path/to/script.py
python3 write_multiprocessing.py -w 3 -n 100
examples/job-scripts/multicore.job
viztracer -o multiprocessing.json examples/write_multiprocessing.py -w 3 -n 100
vizviewer multiprocessig.json
ncalls | tottime | tottime/3 | function |
---|---|---|---|
1000 | 3.1 | 1.6 | io.open |
1001 | 0.9 | 0.3 | recv_bytes |
1000 | 0.48 | 0.16 | generate_data |
ncalls | tottime | tottime (per worker) | function |
---|---|---|---|
2003 | 8.2 | 2.7 | enter |
1000 | 3.6 | 1.9 | io.open |
1003 | 7.2 | 2.5 | recv_bytes |
1000 | 0.6 | .2 | generate_data |
pool.apply(write_data,args=(X,Y,output,nFiles,i))
X,Y,output,nFiles
even though they never changeexamples/write_multiprocessing_queue.py
class worker(mp.Process): ## Initialize static proccess data def __init__(self,task_queue,size, output,nFiles,**kwargs): super(worker,self).__init__() x_origin=0 y_origin=500 x = np.arange(x_origin-size/2,x_origin+size/2,1) y = np.arange(y_origin-size/2,y_origin+size/2,1) self.X,self.Y = np.meshgrid(x,y) self.nFiles=nFiles self.output=output # Where to get work from self.task_queue=task_queue # Define work each process does def run(self): print("Starting Process:%d " % self.pid) time.sleep(1) while True: try: i = self.task_queue.get(timeout=1) except q.Empty: print("No more work to do") #self.terminate() break write_data(self.X,self.Y,self.output,self.nFiles,i) self.task_queue.task_done() return
examples/write_multiprocessing_queue.py
task_queue = mp.JoinableQueue() # Create work for i in range(0,nFiles): task_queue.put(i) # start workers for i in range(0,nWorkers): w=worker(task_queue,size,output,nFiles) w.start() workers.append(w) print("\nWaiting for work to complete...\n") task_queue.join() for w in workers: print(f"Closing worker {w}") w.join()
ncalls | tottime | tottime/3 | function |
---|---|---|---|
1000 | 2.3 | 0.8 | io.open |
1000 | 0.5 | 0.17 | generate_data |
1000 | 0.03 | 0.01 | recv_bytes |
ncalls | tottime | tottime (per worker) | function |
---|---|---|---|
1003 | 2.4 | 0.8 | io.open |
1000 | 0.5 | 0.17 | generate_data |
1000 | 0.03 | 0.01 | recv_byte |
2001 | 0.001 | ~0 | __enter__ |
examples/write_multiprocessing.py
3 workers, ~7.6 sncalls | tottime | tottime (per worker) | function |
---|---|---|---|
2003 | 8.2 | 2.7 | __enter__ |
1003 | 7.2 | 2.5 | recv_bytes |
1000 | 3.6 | 1.9 | io.open |
1000 | 0.6 | .2 | generate_data |
examples/write_multiprocessing_queue.py
3 workers, ~2.8 sncalls | tottime | tottime (per worker) | function |
---|---|---|---|
2001 | 0.001 | ~0 | __enter__ |
1000 | 0.03 | 0.01 | recv_byte |
1003 | 2.4 | 0.8 | io.open |
1000 | 0.5 | 0.17 | generate_data |
examples/write_mpi.py
if rank == 0: # Summarize params print("My names is rank %d"%rank+ " and I'm starting...") print('world_size= %s' %world_size) print('nFiles=%s' %nFiles) print('output_template=%s%%05d.txt ' %output) print('size=%d' %size) recipient=1 data_chunks=np.array_split(data,n_chunks) #print(data_chunks) for chunk in data_chunks[:-1]: comm.Send([chunk,MPI.INT], dest=recipient, tag=77) recipient +=1 local_data=data_chunks[n_chunks-1]
else: local_data=np.empty(int(nFiles/n_chunks),dtype='i') comm.Recv([local_data,MPI.INT],source=0, tag=77)
for i in local_data: write_data(X,Y,output,nFiles,i)
#!/bin/bash #SBATCH --ntasks=16 #SBATCH --cpus-per-task=1 #SBATCH --mem-per-cpu=3GB #SBATCH --time=1:00:00 #SBATCH --account=<account_id> module purge module load gcc/8.3.0 module load openblas/0.3.8 module load openmpi/4.0.2 module load pmix/3.1.3 module load python/3.7.6 srun --mpi=pmix_v2 python3 /path/to/script.py
srun --mpi=pmix_v2 python3 write_mpi.py -n 100
examples/job-scripts/mpi.job
comm.Send
and com.Recv
we can create our own work load managercProflie
and pdb
againmpi4py
has a few convenience functions for common taskscomm.Scatter
will share data to every workerprint("My names is rank %d"%rank+ " and I'm starting...") chunks=None if rank == 0: # Send pieces of data from rank 0 to whole world data = np.arange(nFiles,dtype='i') chunks = data.reshape((n_chunks,int(nFiles/n_chunks))) recvbuf = np.empty(int(nFiles/n_chunks),dtype='i') comm.Scatter(chunks,recvbuf,root=0) for i in recvbuf: #print("writing ...", i) write_data(X,Y,output,nFiles,i)
examples/write_vectorized_binary.py
we can modifying write_data()
examples/write_hdf5.py
def write_data(X,Y,output,nFiles,i,hf): filename=("output/%s%05d" %(output,i)) Z = generate_data(X,Y,nFiles,i) hf.create_dataset(filename,data=Z)
hf = h5py.File('output/data.h5', 'w')
examples/write_vectorized_binary.py
~5.46s (1000 writes)
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
1000 | 1.098 | 0.001 | 1.098 | 0.001 | {built-in method io.open} |
1000 | 0.860 | 0.001 | 0.860 | 0.001 | write_vectorized_binary.py:6(generate_data) |
1000 | 0.517 | 0.001 | 0.518 | 0.001 | {method ‘tofile’ of ‘numpy.ndarray’ objects} |
637 | 0.451 | 0.001 | 0.451 | 0.001 | {built-in method posix.stat} |
examples/write_hdf5.py
~2.54s (1000 writes)
ncalls | tottime | percall | cumtime | percall | filename:lineno(function) |
---|---|---|---|---|---|
1000 | 0.873 | 0.001 | 0.873 | 0.001 | write_hdf5.py:7(generate_data) |
778 | 0.235 | 0.000 | 0.235 | 0.000 | {built-in method posix.stat} |
1000 | 0.211 | 0.000 | 0.242 | 0.000 | dataset.py:38(make_new_dset) |
examples/write_final.py
hf=h5py.File('output/data.hdf5', 'w',driver='mpio',comm=comm)
write_data
function as single core version