#!/usr/local/bin/perl # # Run plugins that need a long time, and feed the results to nagios as # passive checks. Run this from cron. # # FIXME: configurable minimum delay for retries # This is for low frequency checks, ie 1/2 hour or slower. # # by Doke Scott, doke@udel.edu, 2008.5.14 # # $Header: /home/doke/work/nagios/RCS/passive_checks,v 1.15 2015/12/21 15:58:31 doke Exp $ # use warnings; no warnings 'uninitialized'; use strict; use Data::Dumper; use Getopt::Long; use Storable qw( freeze thaw ); use MIME::Base64; use Socket; use POSIX; use Time::HiRes qw( usleep gettimeofday ); use vars qw( $config_file $nagios_cmd_pipe_default $nagios_cmd_pipe @send_nsca_cmds $max_workers $max_tries $timeout $bufsize $pattern $show_stats $noop $verbose $help $timeout_cmdline ); $config_file = "/usr/local/nagios/etc/localhost/passive_checks.cfg"; $nagios_cmd_pipe_default = "/usr/local/nagios/var/rw/nagios.cmd"; $nagios_cmd_pipe = ''; $timeout = 300; # sanity time limit, seconds $max_workers = 6; $max_tries = 6; $bufsize = 1048576; $ENV{PATH} = "/usr/bin"; $pattern = '.'; $show_stats = 0; $noop = 0; $verbose = 0; $help = 0; use vars qw( $status_new $status_idle $status_busy $status_failed $status_done ); $status_new = 1; $status_idle = 2; $status_busy = 3; $status_failed = 4; $status_done = 5; use vars qw( $queue %config $alarm_timed_out $start $finish ); ########################### sub usage { print qq{Usage: $0 [-vh] [-f ] -f file read configuration from file [$config_file] -p pat only do entries that match host/service pattern -s show statistics -t n timeout after n seconds -n noop -v verbose -h this help }; exit $_[0]; } Getopt::Long::Configure ("bundling"); GetOptions( 'f=s' => \$config_file, 'p=s' => \$pattern, 's+' => \$show_stats, 't=i' => \$timeout_cmdline, 'n' => \$noop, 'v+' => \$verbose, 'h' => \$help, ); &usage( 0 ) if ( $help ); if ( $verbose || $show_stats ) { $| = 1; $start = gettimeofday(); printf "starting at %s\n", scalar( localtime( $start ) ); } &read_config( $config_file ); if ( $timeout_cmdline ) { # make cmdline timeout override config file and default $timeout = $timeout_cmdline; } &do_checks(); if ( $verbose || $show_stats ) { $finish = gettimeofday(); printf "finished at %s, elapsed %0.3f s\n", scalar( localtime( $finish ) ), $finish - $start; } exit 0; ################################## # Read the config file into memory. # Format for normal lines is is: host service cmd # sub read_config{ my( $config_file ) = @_; my ( $dev, $ino, $mode, $nlink, $uid, $gid, $rdev, $size, $atime, $mtime, $ctime, $blksize, $blocks, $host, $service, $cmd ); if ( ! -e $config_file ) { print "no config file!\n"; exit -1; } elsif ( ! -f _ ) { print "config file is not a plain file: $config_file\n"; exit -1; } ( $dev, $ino, $mode, $nlink, $uid, $gid, $rdev, $size, $atime, $mtime, $ctime, $blksize, $blocks ) = stat( _ ); #if ( $uid != 0 ) { #print "config file is not owned by root: $config_file\n"; #exit -1; #} if ( $mode & 0133 ) { print "config file permissions are too open: $config_file\n"; exit -1; } if ( ! open( fH, $config_file ) ) { print "can't read config file $config_file: $!\n"; exit -1; } while ( ) { chomp; if ( m/^\s*#/ || m/^\s*$/ ) { # ignore it } elsif ( m/^ send_nsca \s* = \s* (\S.+) $/x ) { push @send_nsca_cmds, $1; } elsif ( m/^ pipe \s* = \s* (\S+) \s*$/x ) { $nagios_cmd_pipe = $1; } elsif ( m/^ timeout \s* = \s* (\d+) \s*$/x ) { $timeout = $1; } elsif ( m/^ max_workers \s* = \s* (\d+) \s*$/x ) { $max_workers = $1; } elsif ( m/^ (\S+) \s+ (\S+) \s+ (\S.*)$/x ) { ( $host, $service, $cmd ) = ( $1, $2, $3 ); next if ( $pattern && "$host/$service" !~ m/$pattern/i ); if ( defined( $config{ $host }{ $service } ) ) { warn "warning $config_file line $. repeats $host $service, ignoring"; next; } $config{ $host }{ $service } = $cmd; } else { warn "Warning: can't parse $config_file line $."; } } close fH; if ( ! $nagios_cmd_pipe && $#send_nsca_cmds == -1 ) { # no pipe nor send_nsca commands were defined $nagios_cmd_pipe = $nagios_cmd_pipe_default; } } sub do_checks { my ( $host, $service, $cmd, $rc, $firstline, $out ); $queue = create_queue_non_threads(); foreach $host ( sort keys %config ) { foreach $service ( sort keys %{$config{ $host }} ) { $cmd = $config{ $host }{ $service }; $verbose && print "$host/$service $cmd\n"; enqueue_non_threads( $queue, 'check_one', \&check_one_callback, ( $host, $service, $cmd ) ); } } queue_runner_non_threads( $queue ); } # called by workers for each job sub check_one { my( $host, $service, $cmd ) = @_; my( $rc, $rc2, $firstline, $out, $send_nsca_cmd ); $SIG{ALRM} = \&alarm_handler; $alarm_timed_out = 0; $rc = 3; # exit status of nagios plugin $rc2 = 3; # result status of both plugin and trying to send report to nagios $firstline = 'no output from plugin'; alarm( $timeout ); # set sanity timer for command execution if ( open( pH, '-|', $cmd ) ) { while ( ) { $verbose && print ">$_"; if ( $. == 1 ) { chomp; $firstline = $_; } } close pH; alarm( 0 ); # stop timer $rc = $? >> 8; } else { $firstline = "unable to run '$cmd': $!"; $rc = 3; warn $firstline . "\n"; } if ( $alarm_timed_out ) { $firstline = "check cmd timed out"; $rc = 3; $alarm_timed_out = 0; } $rc2 = $rc; foreach $send_nsca_cmd ( @send_nsca_cmds ) { $out = sprintf "%s\t%s\t%d\t%s\n", $host, $service, $rc, $firstline; $verbose && print "out $out | $send_nsca_cmd"; alarm( $timeout ); # timeout for nsca command if ( open( pH, "| $send_nsca_cmd" ) ) { print pH $out; close pH; alarm( 0 ); # stop timer } else { warn "can't run $send_nsca_cmd: $!\n"; $rc2++; } } if ( $nagios_cmd_pipe ) { $out = sprintf "[%lu] PROCESS_SERVICE_CHECK_RESULT;%s;%s;%d;%s\n", time(), $host, $service, $rc, $firstline; $verbose && print "out $out"; # append results to nagios command file # first make certain it exists if ( ! -e $nagios_cmd_pipe ) { #warn "$nagios_cmd_pipe does not exist!\n"; sleep 10; # give nagios some time to restart $rc2++; return; } # and that's it's a pipe if ( ! -p $nagios_cmd_pipe ) { #warn "$nagios_cmd_pipe is not a pipe!\n"; sleep 10; # give nagios some time to restart $rc2++; return; } alarm( $timeout ); # timeout for cmd pipe write if ( ! open( cH, ">$nagios_cmd_pipe" ) ) { warn "can't open $nagios_cmd_pipe: $!\n"; $rc2++; return; } if ( ! $noop ) { print cH $out; } close cH; alarm( 0 ); # stop timer } if ( $alarm_timed_out ) { $firstline = "timed out while writing cmdpipe"; $rc2++; $alarm_timed_out = 0; } return $rc2; } # called by master to check the worker's result for each job # return 0 if ok, non-zero if bad sub check_one_callback { my( $buf, @params ) = @_; if ( $buf eq '0' ) { return 0; } return 1; } ################################################################ # Doke's 2nd non-threaded parallel queue runner # # This spawns multiple worker processes, not threads, that # work their way through a queue of jobs. # # FIXME: turn this into a module # sub create_queue_non_threads { my $queue = {}; $verbose && print "create_queue_non_threads()\n"; $queue->{ 'max_workers' } = $max_workers; $queue->{ 'max_tries' } = $max_tries; $queue->{ 'jobs' } = []; # job info $queue->{ 'job_id_seq' } = 1; # start at 1, 0 is a sentinel $queue->{ 'job_cursor' } = 1; # next job to try assigning $queue->{ 'workers' } = {}; # worker id -> job id $queue->{ 'worker_id_seq' } = 1; $queue->{ 'nrunning' } = 0; # number of workers running return $queue; } sub enqueue_non_threads { my( $queue, $command, $callback, @params ) = @_; my( $job, $jid ); $verbose && print "enqueue_non_threads( $queue, $command, $callback, @params )\n"; $jid = $queue->{ 'job_id_seq' }++; $verbose && print "enqueuing job $jid, @_\n";; $job = {}; $job->{ 'jid' } = $jid; $job->{ 'command' } = $command; $job->{ 'callback' } = $callback; $job->{ 'params' } = \@params; $job->{ 'status' } = $status_new; $job->{ 'tries' } = 0; $job->{ 'wid' } = 0; $queue->{ 'jobs' }->[ $jid ] = $job; } sub queue_runner_non_threads { my( $queue ) = @_; my( $njobs, $njobs_waiting, $njobs_busy, $njobs_done, $jid, $job, $nworkers, $nworkers_busy, $nworkers_idle, $wid, $worker, $pid, $data, $start, $finish, $child, $rin, $rout, $nfound, $l, $buf, $nrunning, $rc, $ago, $ago2, $ago3, $status, $out, $n2start, $i, $pipe2child, $pipe2parent, @wids_idle, $r, $callback, $time, $line, $got_partial_output, $loopcount ); $verbose && print "queue_runner_non_threads( $queue )\n"; $start = gettimeofday(); $verbose && printf "starting at %.3f\n", $start; $njobs = 0; $njobs_waiting = 0; $njobs_busy = 0; $njobs_done = 0; $nworkers = 0; $nworkers_busy = 0; $nworkers_idle = 0; $loopcount = 0; main_loop: while ( 1 ) { # only recount every 10 times through the main loop, # to reduce cpu if ( --$loopcount <= 0 ) { $loopcount = 10; # we have to recount the jobs, because the callbacks can alter the job queue $verbose && print "pre-count njobs $njobs, waiting $njobs_waiting, busy $njobs_busy, done $njobs_done\n"; $njobs = scalar( @{$queue->{ jobs }} ) - 1; $njobs_waiting = 0; $njobs_busy = 0; $njobs_done = 0; foreach $jid ( 1 .. scalar( @{$queue->{ jobs }} ) ) { $job = $queue->{ jobs }->[ $jid ]; if ( $job->{ status } == $status_new || ( $job->{ status } == $status_failed && $job->{ tries } < $queue->{ max_tries } ) ) { $njobs_waiting ++; } elsif ( $job->{ status } == $status_busy ) { $njobs_busy++; } elsif ( $job->{ status } == $status_done ) { $njobs_done++; } } $verbose && print "njobs $njobs, waiting $njobs_waiting, busy $njobs_busy, done $njobs_done\n"; # count workers $verbose && print "pre-count nworkers $nworkers, busy $nworkers_busy, idle $nworkers_idle\n"; $nworkers = 0; $nworkers_busy = 0; $nworkers_idle = 0; foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; if ( $worker->{ status } == $status_busy ) { $nworkers ++; $nworkers_busy ++; } elsif ( $worker->{ status } == $status_idle ) { $nworkers ++; $nworkers_idle ++; } # else it's done } $verbose && print "nworkers $nworkers, busy $nworkers_busy, idle $nworkers_idle\n"; if ( $njobs_waiting == 0 && $nworkers_busy == 0 ) { # no jobs waiting, and no workers busy last; } } # do any workers have results for us? $got_partial_output = 0; # build the select vector $rin = ''; foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; if ( $worker->{ 'pipe' } ) { vec( $rin, fileno( $worker->{ 'pipe' } ), 1 ) = 1; } } $nfound = select( $rout = $rin, undef, undef, 0.1 ); $verbose > 1 && print "select found $nfound\n"; if ( $nfound > 0 ) { # figure out which ones had output foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; if ( ! vec( $rout, fileno( $worker->{ 'pipe' } ), 1 ) ) { next; } # read it $alarm_timed_out = 0; $SIG{ALRM} = 'alarm_handler'; alarm( 3 ); $l = sysread $worker->{ 'pipe' }, $worker->{ 'buf' }, $bufsize, length $worker->{ 'buf' }; alarm( 0 ); if ( $alarm_timed_out ) { warn "timed out in read from worker $wid\n"; } if ( $l <= 0 ) { next; } $verbose && print "got data from worker $wid\n"; # child is supposed to respond with "job $jid, took $time, $output\n"; if ( $worker->{ buf } !~ m/\n/ ) { # don't have an entire line yet. $verbose && print "worker $wid returned partial output, continuing\n"; $got_partial_output = 1; next; } $verbose > 1 && print "child $worker->{ wid } pid $worker->{ pid } said: '$buf'"; # if this gets out of sync, we might have multiple lines of output? while ( $worker->{ buf } =~ m/(.*?)\n(.*)/s ) { $line = $1; $worker->{ 'buf' } = $2; $verbose && print "got a complete line from worker $wid\n"; if ( $line =~ m/^job (\d+), time ([\d\.]+), (.*)/ ) { $jid = $1; $time = $2; $out = $3; if ( $jid != $worker->{ 'jid' } ) { warn "worker $worker->{ wid } returned result for job $jid,", " when it was supposed to be working on job $worker->{ jid }\n"; } $worker->{ 'time' } += $time; $verbose > 1 && print "processing job $jid output '$out'\n"; if ( ! $jid ) { warn "invalid jid $jid\n"; next; } $job = $queue->{ jobs }->[ $jid ]; if ( ! $job ) { warn "invalid job $jid\n"; next; } if ( $job->{ 'status' } != $status_busy ) { if ( $job->{ 'status' } == $status_done ) { warn "got output for job $jid, but it's already done.\n"; # how bad is this? } else { warn "got output for job $jid, but it's not marked busy.\n"; # how bad is this? #next; } } # did the job have a callback? # if so, call it to decide if the job worked # callback will return 0 on success if ( $job->{ callback } ) { $callback = $job->{ callback }; $r = &$callback( $out, @{$job->{ 'params' }} ); if ( ! $r ) { # success $verbose && print "job $jid callback succeeded\n"; if ( $job->{ status } == $status_busy ) { $njobs_busy --; $njobs_done ++; } $job->{ status } = $status_done; } else { $verbose && print "job $jid callback failed \n"; $job->{ status } = $status_failed; if ( $job->{ tries } > $queue->{ max_tries } ) { # too many failures warn "job $jid failed too many times\n"; } } } else { # no callback, assume it worked if ( $job->{ status } == $status_busy ) { $njobs_busy --; $njobs_done ++; } $job->{ status } = $status_done; } } elsif ( $line =~ m/^\s*$/ ) { # ignore blank line } else { # unparseable line warn "worker $worker->{ wid } returned malformed output '$line'\n"; $jid = $worker->{ $jid }; $job = $queue->{ jobs }->[ $jid ]; if ( $job && $job->{ status } == $status_busy ) { warn "marking job $jid failed\n"; $job->{ status } = $status_failed; $njobs_busy--; } elsif ( $jid ) { # this isn't supposed to happen warn "worker $worker->{ wid } was working on job $jid, but job is not busy\n"; } else { # this isn't supposed to happen warn "worker $worker->{ wid } wasn't supposed to be working on any job\n"; } } } $worker->{ status } = $status_idle; $worker->{ last_reported } = gettimeofday(); $nworkers_busy --; $nworkers_idle ++; $nfound --; } } # any to timeout? # used to do this with a signal handler, but can't use sigalrm # and select $ago = gettimeofday() - $timeout; $ago2 = $ago - $timeout; $ago3 = $ago2 - $timeout; if ( $verbose > 1 ) { printf "now %f\n", gettimeofday(); print "ago $ago\n"; print "ago2 $ago2\n"; print "ago3 $ago3\n"; } foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; $verbose > 1 && print "worker $wid, $worker->{ last_reported }\n"; if ( $worker->{ status } == $status_busy || $worker->{ status } == $status_done ) { if ( $worker->{ last_reported } < $ago3 ) { warn "SIGKILLing child $wid pid $worker->{ pid }\n"; kill 9, $worker->{ pid }; $worker->{ status } = $status_done; } elsif ( $worker->{ last_reported } < $ago2 ) { warn "SIGTERMing child $wid pid $worker->{ pid }\n"; kill 15, $worker->{ pid }; $worker->{ status } = $status_done; } elsif ( $worker->{ last_reported } < $ago ) { warn "SIGHUPing child $wid pid $worker->{ pid }\n"; kill 1, $worker->{ pid }; $worker->{ status } = $status_done; } } } # did any workers die, or get killed? # see if any are done foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; $pid = waitpid $worker->{ pid }, WNOHANG; if ( $pid == $worker->{ pid } ) { # freed one $status = $?; $verbose && print "worker $wid pid $pid exited with status $status\n"; # drain any output from the pipe $rin = ''; vec( $rin, fileno( $worker->{ 'pipe' } ), 1 ) = 1; $nfound = select( $rout = $rin, undef, undef, 0.25 ); if ( $nfound > 0 && $worker->{ 'pipe' } && vec( $rout, fileno( $worker->{ 'pipe' } ), 1 ) ) { $l = sysread $worker->{ 'pipe' }, $buf, $bufsize; if ( $l ) { print "child $wid pid $worker->{ pid } said: $buf"; } # since worker died, assume job failed, don't try to do the callback } close $worker->{ 'pipe' }; if ( $worker->{ 'status' } == $status_busy ) { $verbose && print "worker $wid pid $pid exited while busy, with status $status\n"; $jid = $worker->{ 'jid' }; if ( $jid ) { $verbose && print "worker $wid pid $pid was working on jid $jid\n"; $job = $queue->{ jobs }->[ $jid ]; if ( $job ) { $verbose && $job && print "worker pid $pid, jid $jid, params ", @{$job->{params}}, "\n"; # since worker died, assume job failed $job->{ status } = $status_failed; if ( $job->{ tries } > $queue->{ max_tries } ) { # too many failures warn "job $jid failed too many times\n"; } } } } if ( $worker->{ status } == $status_busy ) { $nworkers_busy--; } $worker->{ status } = $status_done; $nworkers --; } } # any jobs to send? $verbose > 1 && print "njobs_waiting $njobs_waiting\n"; if ( $njobs_waiting > 0 ) { # do we need to start more workers? $verbose > 1 && print "nworkers $nworkers, max_workers $queue->{ max_workers }\n"; if ( $nworkers < $queue->{ max_workers } ) { #$nworkers_idle = $nworkers - $nworkers_busy; $verbose > 1 && print "nworkers_idle $nworkers_idle\n"; $n2start = min( ( $queue->{ max_workers } - $nworkers ), ( $njobs_waiting - $nworkers_idle ) ); $verbose > 1 && print "n2start $n2start\n"; for $i ( 1 .. $n2start ) { # start a new worker $wid = $queue->{ 'worker_id_seq' }++; $verbose && print "starting new worker $wid\n"; # linux wants PF_UNIX, solaris wants PF_NONSPEC # undef works on both undef $pipe2child; undef $pipe2parent; if ( ! socketpair( $pipe2child, $pipe2parent, AF_UNIX, SOCK_STREAM, undef ) ) { warn "can't open socketpair: $!\n"; usleep( 10000 ); next; } # why the heck doesn't this work on gjallarhorn, with sun's perl? #$pipe2child->autoflush(1); # deliver commands and results immediately, don't buffer #$pipe2parent->autoflush(1); my $oldfh = select( $pipe2child ); $| = 1; select( $pipe2parent ); $| = 1; select( $oldfh ); $pid = fork; if ( $pid > 0 ) { # fork worked, in parent close $pipe2parent; $pipe2parent = undef; $worker = {}; $worker->{ 'wid' } = $wid; $worker->{ 'pid' } = $pid; $worker->{ 'jid' } = 0; $worker->{ 'pipe' } = $pipe2child; undef $pipe2child; # keep the next socketpair call from closing it $worker->{ 'status' } = $status_idle; $worker->{ 'last_reported' } = gettimeofday(); $worker->{ 'time' } = 0; $queue->{ 'workers' }{ $wid } = $worker; $nworkers ++; } elsif ( ! $pid ) { # fork worked, in child close $pipe2child; $pipe2child = undef; $rc = &worker_non_threads( $wid, $pipe2parent ); close $pipe2parent; exit $rc; } else { # fork failed warn "fork failed: $!\n"; close $pipe2child; close $pipe2parent; $pipe2child = undef; $pipe2parent = undef; } } } # any idle workers to send jobs to? if ( $nworkers > $nworkers_busy ) { # find idle workers and build a list foreach $wid ( keys %{$queue->{ workers }} ) { if ( $queue->{ workers }{ $wid }->{ 'status' } == $status_idle ) { $worker = $queue->{ workers }{ $wid }; # find a waiting job foreach $jid ( $queue->{ 'job_cursor' } .. $queue->{ 'job_id_seq' }, 1 .. $queue->{ 'job_cursor' } ) { $job = $queue->{ jobs }->[ $jid ]; if ( $job && $job->{ status } == $status_new || ( $job->{ status } == $status_failed && $job->{ tries } < $queue->{ max_tries } ) ) { # ok, it's a waiting job $verbose && print "assigning job $job->{ jid } to worker $worker->{ wid }\n"; $job->{ wid } = $wid; $job->{ status } = $status_busy; $job->{ tries } ++; $worker->{ jid } = $jid; $worker->{ status } = $status_busy; $worker->{ 'last_reported' } = gettimeofday(); $pipe2child = $worker->{ 'pipe' }; printf $pipe2child "start job %d, %s, %s\n", $jid, $job->{ command }, join( ', ', @{$job->{ 'params' }} ); $njobs_busy++; $njobs_waiting--; $nworkers_busy++; $nworkers_idle--; if ( ++($queue->{ 'job_cursor' }) >= $queue->{ 'job_id_seq' } ) { $queue->{ 'job_cursor' } = 1; } last; } } } } } } $verbose && print "\n"; if ( $got_partial_output ) { ; usleep( 10000 ); } else { usleep( 100000 ); } } $verbose && print "main loop done\n"; if ( $show_stats || $verbose ) { my( $ntries, $elapsed, $total_time ); $finish = gettimeofday(); printf "njobs %d, done %d, failed %d\n", $njobs, $njobs_done, $njobs - $njobs_done; foreach $jid ( 1 .. $queue->{ 'job_id_seq' } ) { $job = $queue->{ jobs }->[ $jid ]; $ntries += $job->{ 'tries' }; } printf "ntries %d, avg tries per job %0.3f\n", $ntries, ( $njobs ) ? $ntries / $njobs : 0; $total_time = 0; foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; $total_time += $worker->{ time }; } $elapsed = $finish - $start; printf "queuer done at %0.3f, %0.3f seconds elapsed\n", $finish, $elapsed; printf "%0.3f seconds per job\n", ( $njobs ) ? $elapsed / $njobs : 0; printf "%0.3f seconds per try\n", ( $ntries ) ? $elapsed / $ntries : 0; printf "speedup %0.3f / %0.3f = %0.3f\n", $total_time, $elapsed, ( $elapsed ) ? $total_time / $elapsed : 0; } } sub worker_non_threads { my( $wid, $pipe ) = @_; my( $jid, $cmd, @params, $start, $out, $finish, $elapsed ); $verbose && print "worker_non_threads( $wid ) pid $$\n"; srand( ( $wid << 16 ) + $$ ); while ( <$pipe> ) { chomp; if ( m/^start job (\d+), (\S*), (.*)/i ) { $jid = $1; $cmd = $2; #@params = split( m/, */, $3 ); my( $host, $service, $cmd ) = split( m/, */, $3, 3 ); $verbose && print "worker $wid, pid $$, starting job $jid, cmd $cmd, params @params\n"; $start = gettimeofday(); $out = ''; $alarm_timed_out = 0; $SIG{ALRM} = 'alarm_handler'; alarm( $timeout * 0.85 ); # this will break if the worker code uses alarm or select #$out = &($cmd)( $jid, @params ); #$verbose > 1 && print "eval qq{ &$cmd( \$params[ 0 ] ) }\n"; #$out = eval qq{ &$cmd( \$params[ 0 ] ) }; #$out = eval qq{ &$cmd( @params ) }; $out = check_one( $host, $service, $cmd ); alarm( 0 ); chomp $out; if ( $alarm_timed_out ) { $out = "failed timeout, $out"; } $verbose && print "worker $wid, pid $$, job $jid, params '@params', result '$out'\n"; $finish = gettimeofday(); $elapsed = $finish - $start; printf $pipe "job %d, time %0.6f, %s\n", $jid, $elapsed, $out; $verbose && printf "worker $wid, pid $$, job $jid, time %0.3f s\n", $elapsed; } elsif ( m/^terminate\s*$/i ) { $verbose && print "worker $wid, pid $$ got terminate command\n"; last; } elsif ( m/^\s*$/i ) { # ignore blank lines } else { warn "worker $wid, pid $$ received unknown command line '$_'\n"; } } $verbose && print "worker $wid, pid $$ returning\n"; return; } sub alarm_handler { warn "alarm timeout in pid $$\n"; $alarm_timed_out = 1; } sub destroy_queue_non_threads { my( $queue ) = @_; my( $wid, $worker, $sig, $pid, $jid, $pipe2child ); $verbose && print "destroy_queue_non_threads( $queue )\n"; foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; $pipe2child = $worker->{ 'pipe' }; print $pipe2child "terminate\n"; close $worker->{ pipe }; } for $sig ( 1, 15, 9 ) { usleep( 100000 ); foreach $wid ( keys %{$queue->{ workers }} ) { $worker = $queue->{ workers }{ $wid }; $pid = waitpid $worker->{ pid }, WNOHANG; if ( $pid ) { delete $queue->{ workers }{ $wid }; } kill $sig, $worker->{ pid }; } } foreach $wid ( keys %{$queue->{ workers }} ) { delete $queue->{ workers }{ $wid }; } foreach $jid ( keys %{$queue->{ jobs }} ) { delete $queue->{ jobs }{ $jid }; } undef %{$queue}; } sub getpass { my( $passfile ) = @_; my( $pw ); open( PWD, $passfile ) or die( "Can't open $passfile: $!\n" ); chomp( $pw = ); close( PWD ); return( $pw ); } sub min { my( $a, $b ) = @_; return ( $a <= $b ) ? $a : $b; }