Maxwell : Nextflow using slurm

Tp provide a primitive example for nextflow data processing, I used crystfel offline processing in it's most simple form. For this particular case, I selected roughly 10k images from cxidb, split the set of images into bunches of roughly equal size, and distribute the jobs as slurm jobs on the cluster. nextflow takes care of the orchestration of the job submission, so you don't have to work with complicated constructs of job-arrays and dependencies.

The sample aims to illustrate very few of nextflows features, likeĀ 

  • creating and feeding channels
  • pipe-lining processes
  • distributing the same channels to two processes
  • merging channels

The image gives a rough idea of the workflow.

Content


The job submission script

A sample submission script could look like this:

nextflow job submission script
#!/bin/bash
#SBATCH --time=0-02:00:00
#SBATCH --nodes=1
#SBATCH --partition=all
#SBATCH --job-name=nextflow-splitpernode-allcpus
#SBATCH --dependency=singleton
#SBATCH --output=nextflow-splitpernode-allcpus-nodes-16.out
#SBATCH --tasks-per-node=1
#SBATCH --constraint=Gold-6240

export LD_PRELOAD=""
export NXF_VER=19.09.0-edge
export NXF_CLUSTER_SEED=$(shuf -i 0-16777216 -n 1)
export NXF_ANSI_LOG=false
export NXF_ANSI_SUMMARY=false
export SBATCH_CONSTRAINT=Gold-6240

numnodes=16

source /etc/profile.d/modules.sh
module load mpi/openmpi-x86_64
module load crystfel

rm -rf .nextflow* $HOME/.nextflow* work outdir/*
rm -rf procdir; mkdir -p procdir

report="${SLURM_JOB_NAME}-nodes-${numnodes}.report.html"
num_images=$(ls -1 /beegfs/desy/group/it/ReferenceData/cxidb/ID-21/cxidb-21-run01[34]*/data1/LCLS*.h5 | wc -l)

#
# prepare for uploads to desycloud
#
URL="https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow/${SLURM_JOB_NAME}-nodes-${numnodes}-${SLURM_JOB_ID}"
perl -pi -e "s|URL=.*|URL=\"$URL\"|" indexamajig.nf.split

test=$(curl -s -n -X PROPFIND -H "Depth: 1" "https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow" | grep "200 OK" | wc -l)
if [[ $test -eq 0 ]]; then
    curl -s -n -X MKCOL "https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow" > /dev/null
fi
curl -s -n -X MKCOL "$URL" > /dev/null


#
# split image files into $numnodes sets
#
ls -1 /beegfs/desy/group/it/ReferenceData/cxidb/ID-21/cxidb-21-run01[34]*/data1/LCLS*.h5 > procdir/files.to.process 
splitlines=$(( $(cat procdir/files.to.process |  wc -l) / $numnodes  ))
split -l $splitlines -d procdir/files.to.process procdir/xxx

#
#  Run workflow
#
start=`date +%s`
startdate=`date`

/software/workflows/nextflow/bin/nextflow run indexamajig.nf.split \
   --in '/beegfs/desy/user/schluenz/Crystfel.Nextflow/procdir/xxx*' \
   -with-report $report

enddate=`date`
end=`date +%s`
runtime=$((end-start))


#
#  Upload results to desyloud
#
#for file in indexamajig.nf.split nextflow-indexamajig.sh outdir/peaks.png outdir/cells.png $report ; do
for file in indexamajig.nf.split nextflow-indexamajig.sh $report ; do
    target=$(echo $file | rev | cut -d/ -f1 | rev)
    curl -s -n -T $file "$URL/$target" > /dev/null
done


cat <<EOF

Summary:
------------------------------------------------------------------------------------------------------
JOB.............: $SLURM_JOB_NAME
JobID...........: $SLURM_JOB_ID
Partition.......: $SLURM_JOB_PARTITION
Nodes...........: $numnodes
Start...........: $startdate
End.............: $enddate
Time............: $runtime
Number of Images: $num_images
URL.............: https://desycloud.desy.de/index.php/apps/files/?dir=/Maxwell/nextflow/

------------------------------------------------------------------------------------------------------

EOF


In detail

The first part of the submission script just sets the environment. Please note that the "base job" allocates only a single node!

environment
#!/bin/bash
#SBATCH --time=0-02:00:00
# only one node is needed. The base job will then submit $numnodes child jobs, see below:
#SBATCH --nodes=1
#SBATCH --partition=all
#SBATCH --job-name=nextflow
# just to avoid that benchmark jobs interfere with each other:
#SBATCH --dependency=singleton
#SBATCH --output=nextflow.out
# this constraint is not really needed:
#SBATCH --constraint=Gold-6240
 
# when submitting from max-display suppress the LD_PRELOAD warnings:
export LD_PRELOAD=""
 
# I want to use DSL 2.0 so use a newer version:
export NXF_VER=19.09.0-edge
 
export NXF_CLUSTER_SEED=$(shuf -i 0-16777216 -n 1)
export NXF_ANSI_LOG=false
export NXF_ANSI_SUMMARY=false
 
# for benchmarks: make sure to run all jobs on identical hardware
export SBATCH_CONSTRAINT=Gold-6240
 
# environment for the processes to run
source /etc/profile.d/modules.sh
module load mpi/openmpi-x86_64
module load crystfel

The rest is the preparation of the input file, and running the actual workflow:

preparation & workflow
# Declare the number of child jobs to be run:
numnodes=16

# define a unique name for the report written by nextflow:
report="${SLURM_JOB_NAME}-nodes-${numnodes}.report.html"

# just count how many image to process in total:
num_images=$(ls -1 /beegfs/desy/group/it/ReferenceData/cxidb/ID-21/cxidb-21-run01[34]*/data1/LCLS*.h5 | wc -l)

# split image files into $numnodes sets of equal size:
ls -1 /beegfs/desy/group/it/ReferenceData/cxidb/ID-21/cxidb-21-run01[34]*/data1/LCLS*.h5 > procdir/files.to.process 
splitlines=$(( $(cat procdir/files.to.process |  wc -l) / $numnodes  ))
split -l $splitlines -d procdir/files.to.process procdir/xxx

# Run workflow
# - the actual workflow is defined in indexamajig.nf.split. That will be explained below
# - --in '<path-to>/procdir/xxx*' defines the input files to be processed by the workflow
# - -with-report $report  we want an html report at the end of job
/software/workflows/nextflow/bin/nextflow run indexamajig.nf.split \
   --in '<path-to>/procdir/xxx*' \
   -with-report $report


Just for fun save the output files to desycloud:

prepare cloud upload
#
# prepare for uploads to desycloud
#  - this will create a unique new folder on desycloud. You need a ~/.netrc file to upload files without username/password
#
URL="https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow/${SLURM_JOB_NAME}-nodes-${numnodes}-${SLURM_JOB_ID}"

# make sure top level folder exists:
test=$(curl -s -n -X PROPFIND -H "Depth: 1" "https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow" | grep "200 OK" | wc -l)
if [[ $test -eq 0 ]]; then
    curl -s -n -X MKCOL "https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow" > /dev/null
fi
# create the new folder for future uploads
curl -s -n -X MKCOL "$URL" > /dev/null

# we will also upload some files as part of the workflow, so make sure the workflow knows the target. There are certainly better ways to do it!
perl -pi -e "s|URL=.*|URL=\"$URL\"|" indexamajig.nf.split

After all images have been processed we actually upload the nextflow report and files to run the workflow

cloud upload
#
#  Upload results to desyloud
#
for file in indexamajig.nf.split nextflow-indexamajig.sh $report ; do
    target=$(echo $file | rev | cut -d/ -f1 | rev)
    curl -s -n -T $file "$URL/$target" > /dev/null
done


The workflow declaration

Again that's just one possible way to define the workflow to illustrate some basics. It uses version2 of the workflow description language, which I personally like much better.

indexamamjig.nf.split: indexamajig
process indexamajig {  /*  all this could also be defined in a profile! */
    executor 'slurm'   /*  use slurm to launch processes                */
    queue    'all'     /*  use all, we want to run 16 jobs in parallel  */
    time     '1h'      /*  actually goes much faster                    */

    input:
      path x           /* will actually contain the procdir/xxx* defined above */

    output:
      file 'indexamajig.out'  /* each process will operate in a different folder, so all output files can be named identical */

    /* the actual processing 
       - we use all (nproc) cores
    */
    script:            
    """
    np=\$(nproc)       /* we use all cores of a machine */
    indexamajig -i "$x" -j \$np -g /beegfs/desy/user/schluenz/Crystfel.Bench/5HT2B-Liu-2013.geom --peaks=hdf5 -o indexamajig.out   
    """
}


process peak_count {
    executor 'slurm'
    queue    'all'
    time     '1h'

    input:
      file x

   output:
      file 'peak_count.out'

   script:
     """
     /home/schluenz/beegfs/Crystfel.Nextflow/peak_count.sh $x > peak_count.out
     """

}

process collect_index_output {
  maxForks 1
  executor 'local'

  publishDir './outdir'

  input:
    file '*.out'

  output:
    file 'indexamajig.all'
    
  """
  cat *.out >> indexamajig.all
  """
}

process collect_peak_info {
  maxForks 1
  executor 'local'

  publishDir './outdir'

  input:
    file '*.out'

  output:
    file 'peaks.info'
    
  """
  cat *.out >> peaks.info
  """
}


process simple_cell_plot {

  maxForks 1
  executor 'local'

  publishDir './outdir'

  input:
   path x

  output:
   file 'cells.png'

  script:
  """
  /beegfs/desy/user/schluenz/Crystfel.Nextflow/plot.cell.py $x
  """

}

process simple_peak_plot {
  maxForks 1
  executor 'local'

  publishDir './outdir'

  input:
   path x

  output:
   file 'peaks.png'

  script:
  """
  /beegfs/desy/user/schluenz/Crystfel.Nextflow/plot.peaks.py $x
  """

}

process upload_image {

  executor 'local'

  input:
   path x

  script:
  """
    target=\$(echo $x | rev | cut -d/ -f1 | rev)
    URL="https://desycloud.desy.de/remote.php/webdav/Maxwell/nextflow/nextflow-splitpernode-allcpus-nodes-16-3467837"
    curl -s -n -T $x "\$URL/\$target" 
  """

}


workflow {

/* run indexamajig and pipe to peak_count */
     data = Channel.fromPath(params.in)
     indexamajig(data) | peak_count 

/* write output to  slurm log and ./outdir/indexamajig.all */
     collect_index_output(indexamajig.out.collect())

/* extract number of peaks and lattices and write to slurm log and ./outdir/peaks.info */
     peakfile=Channel.fromPath('/beegfs/desy/user/schluenz/Crystfel.Nextflow/outdir/peaks.info')
     collect_peak_info(peak_count.out.collect()) |  (simple_peak_plot & simple_cell_plot) | mix() | upload_image

}

indexamamjig.nf.split: the workflow declaration
/* Let's start at the end */

workflow {

     /* the filenames from the script create a so called channel */
     data = Channel.fromPath(params.in)

     /* indexamajig function will process each of the files.  The result for each of the processes will be piped into a function called peak_count */
     indexamajig(data) | peak_count 

     /* All results are in separate files. Simply collect it in arbitrary order */
     collect_index_output(indexamajig.out.collect())

     /* extract number of peaks and lattices and write to slurm log and ./outdir/peaks.info */
     /* peakfile is again a channel */
     peakfile=Channel.fromPath('/beegfs/desy/user/schluenz/Crystfel.Nextflow/outdir/peaks.info')
     /* collect_peak info aggregates the output from peak_count runs, and pipes it into two plot routines.
        the plot routines generate two images which populate two output channels. mix() merges the channels,
         so that the function "upload_image" can upload all images in the channel to desycloud 
     */
     collect_peak_info(peak_count.out.collect()) |  (simple_peak_plot & simple_cell_plot) | mix() | upload_image
}

Results

The scriplet above where used to run simple benchmarks. Execution of the code was done for various number of nodes in a single run, ranging from 1 to 58 concurrent jobs and nodes. Scaling is far from linear, but always also quite a bit of concurrent i/o which is presumably the limiting factor. The time to process the 9280 images can be reduced to about 300s, or 0.03s per image. Peak counting and plotting don't contribute significantly to execution times.

Just for sake of completeness, the two plot generated as part of the workflow:


Additional files

peak_count.sh
#!/bin/bash
x=$1

images=$(grep "Image file" $x  | awk '{print $3}' | rev | cut -d"/" -f1 | rev )
extract=$(egrep 'num_peaks|Image f|Cell|lattice|centering' $x )

for image in $images ; do
    z=$(echo "$extract" | sed -n "/$image/,/Image f/p"  ) 
    npeaks=$( echo "$z" | grep num_peaks | awk '{print $3}')
    filename=$image
    cell=$( echo "$z" | grep "Cell parameters" | awk '{print $3, $4, $5, $7, $8, $9 }' )
    centering=$( echo "$z" | grep centering | awk '{print $3}' )
    lattice=$( echo "$z" | grep lattice_type | awk '{print $3}' )
    if [[ "x$cell" == "x" ]]; then
	cell="0.00 0.00 0.00 0.00 0.00 0.00"
	centering="0"
	lattice="0"
    fi
    if [[ "x$filename" != "x" ]]; then
	run=$(echo $filename | cut -d_ -f4)
	imgno=$(echo $filename | cut -d_ -f5,6 | cut -d. -f1 )
	echo "$filename $run $imgno $npeaks $cell $centering $lattice "
    fi
done

exit

The python plots were just for my entertainment, don't take it seriously!

plot.peaks.py
#!/usr/bin/python3
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
from cycler import cycler
import sys

try:
    infile = sys.argv[1]
except:
    print(' Provide an input file')
    exit()

npeaks=[]
run=[]
srun=[]

n=0
lineList = [line.rstrip('\n').split() for line in open(infile)]
lineList.sort()
for line in lineList:
    if len(line) > 1:
        this_run=line[1]
        if n==0:
            run.append(this_run)
            srun.append(0)
        elif this_run != run[-1]:
            run.append(this_run)
            srun.append(n)

        npeaks.append(int(line[3]))
        n=n+1

srun.append(n)

colormap = plt.cm.gist_ncar
colors = [colormap(i) for i in np.linspace(0, 1,len(srun))]

plt.style.use('dark_background')
plt.ylabel("peaks per image")
plt.margins(0.2)
plt.subplots_adjust(bottom=0.15)
plt.xlim(left=0)
plt.xlim(right=len(npeaks))

for i in range(len(srun)-1):
    x = np.arange(srun[i],srun[i+1])
    plt.bar(x, npeaks[srun[i]:srun[i+1]], width=1., color=colors[i]) 
    plt.vlines(srun[i]+0.6,0,max(npeaks)+20,linestyles='dotted',colors='r',label='')
    plt.hlines(np.median(npeaks[srun[i]:srun[i+1]]),srun[i],srun[i+1]-1,colors=colors[i],linestyles='solid',label=run[i])

plt.ylim(top=max(npeaks)+20)
plt.legend(loc='upper left', prop={'size':6}, bbox_to_anchor=(1,1))

plt.gcf().savefig('peaks.png')

plot.cell.py
#!/usr/bin/python3
import matplotlib as mpl
mpl.use('Agg')
import scipy.stats
import matplotlib.pyplot as plt
import numpy as np
import sys

try:
    infile = sys.argv[1]
except:
    print(' Provide an input file')
    exit()

run=[]
srun=[]


n=0
lineList = [line.rstrip('\n').split() for line in open(infile)]
lineList.sort()
cell=[]
ctmp=[]*6

for line in lineList:
    if float(line[4]) > 0.0:
        ctmp=[float(i) for i in line[4:10]]
        cell.append(ctmp)
        n=n+1

x = np.linspace(0, 2 * np.pi, 400)
y = np.sin(x ** 2)

cells = np.array(cell)

plt.style.use('dark_background')

fig, axs = plt.subplots(2, 3)
fig.suptitle('cell dimensions')

axs[0, 0].set_title('A')
axs[0, 1].set_title('B')
axs[0, 2].set_title('C')
axs[1, 0].set_title(r'$\alpha$')
axs[1, 1].set_title(r'$\beta$')
axs[1, 2].set_title(r'$\gamma$')

for ax in axs.flat:
    ax.yaxis.set_major_locator(plt.NullLocator())

for i in range(3):
    n, bins, patches = axs[0, i].hist(cells[:,i], 50, density=1)
    (mu, sigma) = scipy.stats.norm.fit(cells[:,i])
    y = scipy.stats.norm.pdf(bins, mu, sigma)
    l = axs[0,i].plot(bins,y,linewidth=0.8)
    median = np.median(cells[:,i])
    textstr = "{:.3f}".format(median)
    axs[0,i].vlines(median,0,max(n),colors='r',linestyles='dotted',label=textstr)
    axs[0,i].legend(prop={'size': 'xx-small'})

    n, bins, patches = axs[1, i].hist(cells[: , i+3], 50, density=1)
    (mu, sigma) = scipy.stats.norm.fit(cells[: , i+3])
    y = scipy.stats.norm.pdf( bins, mu, sigma)
    l = axs[1, i].plot(bins, y, linewidth=0.8)
    median = np.median(cells[:,i+3])
    textstr = "{:.3f}".format(median)
    axs[1, i].vlines(median,0,max(n),colors='r',linestyles='dotted',label=textstr)
    axs[1,i].legend(prop={'size': 'xx-small'})


plt.subplots_adjust(hspace=0.5)

fig.savefig('cells.png')


Attachments:

nextflow (application/gliffy+json)
nextflow.png (image/png)
nextflow-1.png (image/png)
nextflow-2.png (image/png)
nextflow-split-load.png (image/png)
peaks.png (image/png)
cells.png (image/png)