Occasionally one needs to run a larger number of fairly identical single threaded jobs. Running each of this jobs as a slurm job is - in our environment - very inefficient, since each of these jobs uses only a single core, leaving all other cores idle. Generally, BIRD would be much better suited, but is not always an option due to data access requirements.

GNU parallel offers handy possibilities to parallelize single-threaded processes. The documentation (https://www.gnu.org/software/parallel/parallel_tutorial.html) provides a number of examples. It's an amazingly powerful tool, and the following "real life" examples only scratches the surface of it's capabilities in a rather primitive way.

Citation for GNU parallel

O. Tange (2011): GNU Parallel - The Command-Line Power Tool, ;login: The USENIX Magazine, February 2011:42-47.

Location of scriplets: /beegfs/desy/user/schluenz/Process_with_GNU_parallel

parallel listening on fifo

It's possible to let parallel listen to a named FIFO socket. So one could use it at as a kind of daemon watching for events coming in and parse on the input to a scriplet. The input can be a single word, but can also contain an entire input file, A most basic filewatcher could be composed of a slurm batch script and file-handling scripts. The slurm batch script:

#!/bin/bash
#SBATCH --time=0-00:20:00
unset LD_PRELOAD

#
#  output file
#
rm -f /beegfs/desy/user/$USER/processing/processed.out
touch /beegfs/desy/user/$USER/processing/processed.out
#
#  setup fifo and let parallel listen to it
#
mkdir -p /tmp/$USER
FIFO=/tmp/$USER/pifo
rm -f $FIFO
mkfifo $FIFO

while true; do cat $FIFO; done | parallel -j 60 --ungroup --gnu --eof "end_of_file" /beegfs/desy/user/$USER/processing/process_files.sh "{}" &

#
# watch for files.
# any new file will be sent to parallel for processing
# as soon as a file name X_DONE_X appears the job will terminate (after all processing is done)
#
IMG_DIR=/beegfs/desy/user/$USER/processing/Images

nfiles=0

while true; do
    #
    #  send all files existing at startup to parallel
    #  and all newly occuring files
    #  find is cheap as it doesn't stat files
    #
    if [[ ! -f /dev/shm/img.lst ]]; then
	find $IMG_DIR -type f | sort > /dev/shm/img.lst
	to_proc=$(cat /dev/shm/img.lst)
    else
	mv /dev/shm/img.lst /dev/shm/img.old
	find $IMG_DIR -type f | sort > /dev/shm/img.lst
	to_proc=$(comm -13 /dev/shm/img.old /dev/shm/img.lst)
    fi

    for new in $to_proc ; do
	echo $new > $FIFO
	nfile=$(($nfile+1))
    done

    #
    #  if X_DONE_X is a file it shouldn't be counted!
    #
    nfile=$(($nfile-1))
    #
    #  X_DONE_X signals end of processing, but wait for processes to finish
    #
    if [[ "$to_proc" =~ "X_DONE_X" ]]; then
	test=999
	# parallel catches sigterm
	nprocessed=$(cat /beegfs/desy/user/$USER/processing/processed.out | wc -l)
	while [[ $test -gt 1 || $nfile -gt $nprocessed ]]; do
	    test=$(ps -alef | grep process_files | grep -v grep | wc -l)
	    nprocessed=$(cat /beegfs/desy/user/$USER/processing/processed.out | wc -l)
	    sleep 1
	done
	echo "stopping this JOB at $(date)" 
	scancel $SLURM_JOB_ID
    fi

    sleep 0.1
done

The file-processing script:

#!/bin/bash
input=$1

if [[ $input == "X_DONE_X" ]]; then
    echo "not processing DONE signal"
#
#   could add a termination scriplet here
#
else
    echo new file $input at $(date)
    sleep $((1 + $RANDOM % 100))
fi
echo $input >> /beegfs/desy/user/$USER/processing/processed.out

One of the downsides of this primitive setup: it's not possible to send an "EOF" through the fifo to terminate parallel. So one needs a kind of signal to terminate the batch-job (or do an scancel once it's done).

Note: parallel won't start any processing before it has received as many lines of input as number of parallel jobs (60 in the example above)! If that's too slow, feed parallel with dummy jobs beforehand!

Running dozor over cbf files

Pretty much the same setup could be used to process cbf-images generated by a grid-scan. To simulate that behavior, a small scriplet copies 1000 images into an empty folder:

#!/bin/bash
for file in /beegfs/desy/user/$USER/TEST/_*.cbf ; do
    cp -v $file /beegfs/desy/user/$USER/processing/Images
done
touch  /beegfs/desy/user/$USER/processing/Images/X_DONE_X_000001.cbf

During the tests the file-copy was started after the batch-job started running, but it basically doesn't matter. The "filewatcher" scripts looks very similar to the one above:

#!/bin/bash
#SBATCH --time=0-00:20:00
unset LD_PRELOAD
source /etc/profile.d/modules.sh
module load xray

#
#  output file
#
procdir=/beegfs/desy/user/$USER/processing/
procfile=/beegfs/desy/user/$USER/processing/dozor.out
rm -f $procfile
touch $procfile
#
#  setup fifo
#
mkdir -p /tmp/$USER
FIFO=/tmp/$USER/pifo
rm -f $FIFO
mkfifo $FIFO

# let parallel listen to fifo
nparallel=$(nproc)
while true; do cat $FIFO; done | parallel -j $nparallel --ungroup --gnu --eof "end_of_file" /beegfs/desy/user/$USER/processing/process_dozor.sh "{}" &

#
# watch for files.
# any new file will be sent to parallel for processing
# as soon as a file name X_DONE_X appears the job will terminate (after all processing is done)
#
IMG_DIR=/beegfs/desy/user/$USER/processing/Images

#
#  for a test, I started with an empty dir
#  copy one h5 file per ~0.1s 
#  run $nparallel dozor jobs at a time
#  for simplicity, parameters obtained from a master_file is omitted, but rather parsed on to the dozor runs
#

cat << EOF > $procdir/template.dat
nx 2463
ny 2527
pixel 0.172
detector_distance 200.0
X-ray_wavelength 1.03315
orgx 1259
orgy 1240
ix_min 0
ix_max 0
iy_min 0
iy_max 0
first_image_number _START_
number_images _NIMAGES_
name_template_image $IMG_DIR/TEST_?????.cbf
library /opt/xray/xdsapp3.0/plugins/xds-zcbf.so
EOF

nfiles=0

while true; do
    #
    #  send all files existing at startup to parallel
    #  and all newly occuring files
    #  find is cheap as it doesn't stat files
    #
    if [[ ! -f /dev/shm/img.lst ]]; then
	find $IMG_DIR -type f | sort > /dev/shm/img.lst
	to_proc=$(cat /dev/shm/img.lst)
    else
	mv /dev/shm/img.lst /dev/shm/img.old
	find $IMG_DIR -type f | sort > /dev/shm/img.lst
	to_proc=$(comm -13 /dev/shm/img.old /dev/shm/img.lst)
    fi

    for new in $to_proc ; do
	NIMAGES=1
	basename=$(echo $new | rev | cut -d/ -f1|rev)
	START=$(echo $basename | rev | cut -d_ -f1|rev | cut -d. -f1 | bc)
	echo "$basename $START $NIMAGES $(date)"
	perl -p -e "s|_START_|$START|" $procdir/template.dat | perl -p -e "s|_NIMAGES_|$NIMAGES|" > $procdir/dozor/$basename.dat
	echo "$procdir/dozor/$basename.dat $procfile" > $FIFO
	nfile=$(($nfile+1))
    done

    #
    #  a file named *X_DONE_X* signals end of processing
    #  doesn't need to be a file, could also simply send it to FIFO
    #
    if [[ "$to_proc" =~ "X_DONE_X" ]]; then
	running=9999
	nprocessed=$(cat $procfile | wc -l)

	#
	# parallel catches sigterm. So when the slurm job is cancelled, it will keep running processes, but not start new ones
        # gives an extra time of up to 300s to finish processing
        # but don't cancel the job while still processes are running
	#
	while [[ $running -gt 1 || $nfile -gt $nprocessed ]]; do
	    running=$(ps -alef | grep process_files | grep -v grep | wc -l)
	    nprocessed=$(cat $procfile | wc -l)
	    sleep 1
	done
	echo "stopping this JOB at $(date)" 
	scancel $SLURM_JOB_ID
    fi

    sleep 0.1
done

The dozor-scriplet doing the actual processing:

#!/bin/bash
input=($1)

if [[ ${input[0]} =~ "X_DONE_X" ]]; then
    echo "not processing DONE signal"
#
#   could add a termination scriplet here
#
else
    echo running  /opt/xray/bin/dozor ${input[0]} at $(date)
    /opt/xray/bin/dozor ${input[0]} >> ${input[1]}
fi
exit


The test run was for 1000 cbf images. The time to "generate" the images was about 90 seconds. The time to process was also 90 seconds, so it's - for this particular scenario - essentially realtime.

When images were already present, processing took about 30s.

Feeding the FIFO from remote

In principle it's possible to feed the FIFO socket through ssh or netcat from remote. Here is a simple recipe ...

The following scripts submits the batch script and feeds a remote listener with the processing information

#!/bin/bash
#
#  submit the batch script
#

j=$(sbatch listener_v2.sh)
jobid=$(echo $j | awk '{print $4}')

#
#  wait for the job to run
#

while [[ $(sacct -j $jobid -X --noheader -o state | grep RUNNING | wc -l) -eq 0 ]]; do
    sleep 10
done
node=$(sacct -j $jobid -X --noheader -o nodelist | awk '{print $1}')

#
#  sometimes the listener on the compute nodes isn't immediately ready, so wait a second
#
sleep 1
echo start submission for job $jobid to node $node at $(date)

#
#  run 1000 jobs
#
export output=/beegfs/desy/user/$USER/processing/grid_scan.out

for i in {00001..01000} ; do 
    pars="nx=2463 ny=2527 pixel=0.172 detector_distance=200.0 X-ray_wavelength=1.03315 orgx=1259 orgy=1240 ix_min=0 ix_max=0 iy_min=0 iy_max=0 first_image_number=$i number_images=1 name_template_image=/beegfs/desy/user/$USER/processing/Images/TEST_?????.cbf library=/opt/xray/xdsapp3.0/plugins/xds-zcbf.so"
	# send all input parameters and the name of the output file to the compute node
    echo "$output $pars" | ncat $node.desy.de 12345  
done
exit

The actual batch-job (listener_v2.sh) launches gnu parallel and a netcat listener which feeds the input to gnu parallel

#SBATCH --time=0-00:30:00
#SBATCH --job-name=dozor-listener
#SBATCH --partition=all
unset LD_PRELOAD
rm -f out $output

#
#  create FIFO
#
mkdir -p /tmp/$USER
mkfifo /tmp/$USER/loopFF

#
#  launch gnu parallel listening to fifo 
#
while true; do cat /tmp/$USER/loopFF ; done | parallel -j $(nproc) --keep-order ./process_listener_v2.sh "{}" >> out &

#
#  start netcat listener on port 12345. netcat will write all input to fifo
#
ncat -k -t -l -p 12345 > /tmp/$USER/loopFF

The final script (process_listener_v2.sh) does the actual dozor run for each individual image:

#!/bin/bash
input=($1)
output=${input[0]}

#
#  create a temporary file serving as input to dozor
#
tmp_dozor=$(mktemp)

for var in ${input[@]:1} ; do
    echo $var | tr '=' ' ' >> $tmp_dozor
done
#
#  run dozor
#
/opt/xray/bin/dozor $tmp_dozor | grep -a '|' | grep '[0-9]' >> $output

exit

This setup took a little less than 30s to process the 1000 images. The batch-job won't terminate, but it would be easy enough to send a signal to terminate once all processes are done.

Using the same procedure, it would also be possible to receive the output from the batch-job through netcat and FIFOs on the batch submit node. Haven't tried it though ...