#!/usr/bin/env python
'''Watch an EPICS PV. Send email when it changes from 0 to 1'''
# Copyright (c) 2014-2015, UChicago Argonne, LLC. See LICENSE file.
# docs: http://PvMail.readthedocs.org
import argparse
import datetime
import epics
import logging
import os
import socket
import sys
import threading
import time
import ini_config
import mailer
import __init__
LOG_FILE = "pvMail-%d.log" % os.getpid()
RETRY_INTERVAL_S = 0.2
CHECKPOINT_INTERVAL_S = 5 * 60.0
CONNECTION_TEST_TIMEOUT = 0.5
gui_object = None
[docs]class PvMail(threading.Thread): # lgtm [py/missing-call-to-init]
'''
Watch an EPICS PV (using PyEpics interface) and send an email
when the PV changes from 0 to 1.
'''
def __init__(self, config=None):
self.trigger = False
self.message = "default message"
self.subject = "pvMail.py"
self.triggerPV = ""
self.messagePV = ""
self.recipients = []
self.old_value = None
self.ca_timestamp = None
self.config = config
self.running = False
self.pv = dict(trigger=None, message=None)
self.pv_cb_index = dict(trigger=None, message=None)
[docs] def basicChecks(self):
'''
check for valid inputs,
raise exceptions as discovered,
otherwise no return result
'''
if len(self.recipients) == 0:
msg = "need at least one email address for list of recipients"
raise RuntimeWarning, msg
parts = {'message': self.messagePV,
'trigger': self.triggerPV}
for key, pv in parts.items():
if len(pv) == 0:
raise RuntimeWarning, "no EPICS PV name for the %s PV" % key
if self.pv[key] is None:
if self.testConnect(pv, timeout=CONNECTION_TEST_TIMEOUT) is False:
fmt = "could not connect to %s PV: %s"
raise RuntimeWarning, fmt % (key, pv)
else:
if not self.pv[key].connected:
fmt = "could not connect to %s PV: %s"
raise RuntimeWarning, fmt % (key, pv)
[docs] def testConnect(self, pvname, timeout=5.0):
'''
create PV,
wait for connection,
return connection state (True | False)
adapted from PyEpics __createPV() method
'''
logger("test connect with %s" % pvname)
retry_interval_s = 0.0001
start_time = time.time()
thispv = epics.PV(pvname)
thispv.connect()
while not thispv.connected:
time.sleep(retry_interval_s)
epics.ca.poll()
if time.time()-start_time > timeout:
break
return thispv.connected
[docs] def do_start(self):
'''start watching for triggers'''
logger("do_start")
if not self.running:
self.basicChecks()
logger("passed basicChecks(), starting monitors")
handler_list = [
['message', self.messagePV, self.receiveMessageMonitor],
['trigger', self.triggerPV, self.receiveTriggerMonitor]
]
for key, pvname, cb in handler_list:
pv = epics.PV(pvname)
self.pv[key] = pv
self.pv_cb_index[key] = pv.add_callback(cb)
self.old_value = self.pv['trigger'].get()
self.message = self.pv['message'].get()
logger("PVs connected")
self.running = True
[docs] def do_stop(self):
'''stop watching for triggers'''
logger("do_stop")
if self.running:
for key, pv in self.pv.items():
if pv is not None and pv.connected:
pv.remove_callback( self.pv_cb_index[key] )
pv.disconnect()
self.pv[key] = None
self.pv_cb_index[key] = None
logger("PVs disconnected")
self.running = False
[docs] def do_restart(self):
'''restart watching for triggers'''
self.do_stop()
self.do_start()
[docs] def receiveMessageMonitor(self, value, **kw):
'''respond to EPICS CA monitors on message PV'''
logger("%s = %s" % (self.messagePV, value))
self.message = value
[docs] def receiveTriggerMonitor(self, value, **kw):
'''respond to EPICS CA monitors on trigger PV'''
logger("%s = %s" % (self.triggerPV, value))
#print self.old_value, type(self.old_value), value, type(value)
if self.old_value == 0 and value == 1:
self.trigger = True # set email trigger flag
pv = self.pv['trigger']
#print pv.pvname, value,pv.timestamp
self.ca_timestamp = pv.timestamp
# send the message in a different thread
#SendMessage(self, self.config)
t = threading.Thread(target=SendMessage, args=(self, self.config))
t.start()
self.old_value = value
[docs]def SendMessage(pvm, agent_db, reporter=None):
'''
construct and send the message
:param obj pvm: instance of PvMail object on which to report
'''
#print "SendMessage", type(pvm), pvm
logger("SendMessage")
pvm.trigger = False # triggered event received
agent_db = agent_db or ini_config.Config()
email_agent_dict = dict(sendmail=mailer.sendMail_sendmail, SMTP=mailer.sendMail_SMTP)
emailer = email_agent_dict[agent_db.mail_transfer_agent]
try:
_send(emailer, pvm, agent_db, logger=logger)
except Exception, exc:
msg = 'problem sending email: ' + str(exc)
logger(msg)
[docs]def getUserName(db):
u1 = os.environ.get('LOGNAME', None)
u2 = os.environ.get('USERNAME', None)
u3 = db.get()['user']
return u1 or u2 or u3
def _send(emailer, pvm, agent_db, reporter=None, logger=None):
pvm.basicChecks()
pvm.subject = "pvMail.py: " + pvm.triggerPV
msg = '' # start with a new message
msg += "\n\n"
msg += epics.caget(pvm.messagePV)
msg += "\n\n"
username = getUserName(agent_db)
msg += 'user: %s\n' % username
msg += 'host: %s\n' % socket.gethostname()
msg += 'date: %s (UNIX, not PV)\n' % datetime.datetime.now()
if hasattr(pvm, "ca_timestamp"):
msg += 'CA_timestamp: %d\n' % pvm.ca_timestamp
else:
msg += 'CA_timestamp: not available\n'
msg += 'program: %s\n' % sys.argv[0]
msg += 'PID: %d\n' % os.getpid()
msg += 'trigger PV: %s\n' % pvm.triggerPV
msg += 'message PV: %s\n' % pvm.messagePV
msg += 'recipients: %s\n' % ", ".join(pvm.recipients)
pvm.message = msg
if logger is not None:
logger('#'*60)
logger(pvm.message)
logger('#'*60)
emailer(pvm.subject, msg, pvm.recipients, agent_db.get(), logger=logger)
logger("message(s) sent")
[docs]def logger(message):
'''
log a message or report from PvMail
:param str message: words to be logged
'''
now = datetime.datetime.now()
name = os.path.basename(sys.argv[0])
pid = os.getpid()
text = "(%d,%s,%s) %s" % (pid, name, now, message)
logging.info(text)
[docs]def cli(results, config=None):
'''
command-line interface to the PvMail class
:param obj results: default parameters from argparse, see main()
:param obj config: email configuration from ini_config.Config()
'''
logging_interval = min(60*60, max(5.0, results.logging_interval))
sleep_duration = min(5.0, max(0.0001, results.sleep_duration))
pvm = PvMail(config)
pvm.triggerPV = results.trigger_PV
pvm.messagePV = results.message_PV
pvm.recipients = results.email_addresses.strip().split(",")
checkpoint_time = time.time()
pvm.do_start()
while True: # endless loop, kill with ^C or equal
if time.time() > checkpoint_time:
checkpoint_time += logging_interval
logger("checkpoint")
if pvm.trigger:
SendMessage(pvm, config)
logger('trigger received, sending email')
time.sleep(sleep_duration)
# pvm.do_stop() # this will never be called
[docs]def gui(results, config=None):
'''
graphical user interface to the PvMail class
:param obj results: default parameters from argparse, see main()
:param obj config: email configuration from ini_config.Config()
'''
import uic_gui
uic_gui.main(
results.trigger_PV,
results.message_PV,
results.email_addresses.strip().split(","),
logger=logger,
logfile=results.log_file,
config=config,
)
[docs]def main():
'''parse command-line arguments and choose which interface to use'''
version = __init__.__version__
doc = 'v' + version + ', ' + __doc__.strip()
parser = argparse.ArgumentParser(description=doc)
# positional arguments
# not required if GUI option is selected
parser.add_argument('trigger_PV', action='store', nargs='?',
help="EPICS trigger PV name", default="")
parser.add_argument('message_PV', action='store', nargs='?',
help="EPICS message PV name", default="")
parser.add_argument('email_addresses', action='store', nargs='?',
help="email address(es), comma-separated if more than one",
default="")
# optional arguments
parser.add_argument('-l', action='store', dest='log_file',
help="for logging program progress and comments",
default=LOG_FILE)
parser.add_argument('-i', action='store', dest='logging_interval',
type=float,
help="checkpoint reporting interval (s) in log file",
default=CHECKPOINT_INTERVAL_S)
parser.add_argument('-r', action='store', dest='sleep_duration',
type=float,
help="sleep duration (s) in main event loop",
default=RETRY_INTERVAL_S)
parser.add_argument('-g', '--gui', action='store_true', default=False,
dest='interface',
help='Use the graphical rather than command-line interface')
parser.add_argument('-v', '--version', action='version', version=version)
results = parser.parse_args()
addresses = results.email_addresses.strip().split(",")
interface = {False: 'command-line', True: 'GUI'}[results.interface]
agent_db = ini_config.Config()
logging.basicConfig(filename=results.log_file, level=logging.INFO)
logger("#"*60)
logger("startup")
logger("trigger PV = " + results.trigger_PV)
logger("message PV = " + results.message_PV)
logger("email list = " + str(addresses) )
logger("log file = " + results.log_file)
logger("logging interval = " + str( results.logging_interval ) )
logger("sleep duration = " + str( results.sleep_duration ) )
logger("interface = " + interface)
user = os.environ.get('LOGNAME', None) or os.environ.get('USERNAME', None)
logger("user = " + user)
logger("host = " + socket.gethostname() )
logger("program = " + sys.argv[0] )
logger("PID = " + str(os.getpid()) )
logger("PyEpics version = " + str(epics.__version__) )
logger("config file = " + agent_db.ini_file )
if results.interface is False:
# When the GUI is not selected,
# ensure the positional arguments are given
tests = [
len(results.trigger_PV),
len(results.message_PV),
len(" ".join(addresses)),
]
if 0 in tests:
parser.print_usage()
sys.exit()
# call the interface
{False: cli, True: gui}[results.interface](results)
if __name__ == '__main__':
main()