This is my first time to do such thing intentionally. I believe everyone who has experience of writing shell script, they all have already done multiprocessing, for example: command &
.
Here is an example output of a script I am about to show you:
# Creating 10 test files at /tmp/test{0..9}.bin, each is 10MB $ for ((i=0;i<10;i++)); do head -c 10m /dev/urandom > "/tmp/test${i}.bin"; done # The single process $ time for f in /tmp/test?.bin; do md5sum "$f"; done a928dc064f3b0f68386ff8e8ae8c3d8e /tmp/test0.bin 59a2940703258a750a6895efbfead10e /tmp/test1.bin 77dc3bb2b0d70ada17174f73d9b8ba5b /tmp/test2.bin e8be270104dc99d7fc842f6b1a8ed622 /tmp/test3.bin dedd45d0f8168ed3c9ecbf4e7458ab87 /tmp/test4.bin efaaa7064a849ab4f4dbd69153fcc11b /tmp/test5.bin 961520ac959d156a71d80628d001a20b /tmp/test6.bin 110185133ecc6b538b0c383295f3a137 /tmp/test7.bin 3f1901a68e828c7dfe16f1a84805dedc /tmp/test8.bin a4032ebc7417b844fc58a841557c73a4 /tmp/test9.bin real 0m0.426s user 0m0.338s sys 0m0.066s # Multiprocessing with three processes to work on data $ time ./mp.sh W0: W0 started to work... W1: W1 started to work... W2: W2 started to work... W0: a928dc064f3b0f68386ff8e8ae8c3d8e /tmp/test0.bin W1: 59a2940703258a750a6895efbfead10e /tmp/test1.bin W2: 77dc3bb2b0d70ada17174f73d9b8ba5b /tmp/test2.bin W0: e8be270104dc99d7fc842f6b1a8ed622 /tmp/test3.bin W1: dedd45d0f8168ed3c9ecbf4e7458ab87 /tmp/test4.bin W2: efaaa7064a849ab4f4dbd69153fcc11b /tmp/test5.bin W0: 961520ac959d156a71d80628d001a20b /tmp/test6.bin W1: 110185133ecc6b538b0c383295f3a137 /tmp/test7.bin W0: a4032ebc7417b844fc58a841557c73a4 /tmp/test9.bin W2: 3f1901a68e828c7dfe16f1a84805dedc /tmp/test8.bin real 0m0.265s user 0m0.342s sys 0m0.072s
The script mp.sh
:
#!/bin/bash
MAX_WORKERS=3
worker () {
echo "$1 started to work..."
while read cmd; do
# if receives exit, then do some finishing jobs
[[ "$cmd" == "exit" ]] && break
md5sum "$cmd"
done
}
get_next () {
(( q_id >= ${#queue[@]} )) && next='' && return 0
next="${queue[q_id]}"
((q_id++))
return
}
for ((i=0;i<MAX_WORKERS;i++)); do
# brings up workers and redirection to mute this :
# ./mp.sh: line 27: warning: execute_coproc: coproc [22652:W0] still exists
eval "coproc W$i { worker W$i; }" &>/dev/null
done
queue=($(ls -1 /tmp/test?.bin))
q_id=0
while :; do
for ((i=0;i<MAX_WORKERS;i++)); do
w_stdout="W$i[0]"
w_stdin="W$i[1]"
read data <&${!w_stdout}
if [[ ! -z "$data" ]]; then
echo "W$i: $data"
get_next
[[ -z "$next" ]] && break
echo "$next" >&${!w_stdin}
fi
done
[[ -z "$next" ]] && break
done
# clean up
for ((i=0;i<MAX_WORKERS;i++)); do
w_stdout="W$i[0]"
w_stdin="W$i[1]"
echo "exit" >&${!w_stdin}
# get the rest of data
while read data; do
echo "W$i: $data"
done <&${!w_stdout}
w_pid="W${i}_PID"
wait ${!w_pid}
done
worker()
is the data processor, which is fed by main program with item from queue
array. The main program checks if a worker returns data, if so, then try to get new item for the worker who just finished the processing.
I use coproc
to create a subshell for worker()
, you should notice that I use eval
because coproc
doesn't accept using Parameter Expansion to supply the name of co-process. We need to name them from W0
, W1
, ..., and so on. You don't want to fix the program with fixed number of workers.
In the main loop, you can see w_stdout
and w_stdin
, which is indirect expansion1, we need it to get the value of W#[0]
, where #
is a digit. When using coproc
, the name of co-process is the key to get co-process' standard input/output and process ID, they are ${NAME[1]}
, ${NAME[0]}
, and ${NAME_PID}
, respectively. You can use read data <&${NAME[0]} ; echo "$data"
to get the output of co-process, and echo "blah blah blah" >&${NAME[1]}
to feed co-process data.
If you design a better protocol for communicating with workers, it surely can do lots of thing. You can even change command(s) for a worker anytime you need. Currently, it only accept exit
, so workers can exit gracefully.
-
data=123 ; point_to=data ; echo "${!point_to}"
echos"123"
. You uses${!varname}
to do such expansion. ^