hydra_keeper
This file uses the EPICS library to evaluate whether to keep the plot and moves it to its respective output location. A File Handler handles all the log messages and exceptions while ‘Hello Hydra’ messages are sent after the socket binds with the constructed address.
KeeperAnnounce
This method constantly announces the Keeper’s activation in the system.
""" To Announce Keeper """
def KeeperAnnounce(context,announcePort):
print("KEEPER ANNOUNCE")
zmqport=announcePort
zmqconnection="tcp://*"
transcontext = context
transsocket = transcontext.socket(zmq.PUB)
toBind=zmqconnection+":%s" % str(zmqport)
print(toBind)
try:
transsocket.bind(toBind)
except Exception as e:
print(e)
while True:
transsocket.send_string("Hello Hydra")
time.sleep(.5)
return
Parameters
context: A ZMQ context representing the connections required for creating the sockets.announcePort: An integer representing the port on which the keeper’s announcement should be made.
getKeepPercent
This method constructs and SQL query based on the parameters, executes the query, and processes the results of the query.
def getKeepPercent(DBConnector, fileName,fileType,isChunked):
""" Returns Plot Id and fraction of data to keep """
Percent_q="SELECT CollectPercent,ID FROM Plot_Types where Name=\""+fileName+"\" && FileType=\""+fileType+"\" && IsChunked is NULL"
if(isChunked):
Percent_q="SELECT CollectPercent,ID FROM Plot_Types where Name=\""+fileName+"\" && FileType=\""+fileType+"\" && IsChunked is not NULL"
print(Percent_q)
try:
CollectPercent = DBConnector.FetchAll(Percent_q)
#print(CollectPercent)
#print(CollectPercent[0])
if(len(CollectPercent)==1):
return CollectPercent[0]["ID"],float(CollectPercent[0]["CollectPercent"])
else:
return CollectPercent[0]["ID"],-1.0
except Exception as e:
print(e)
return -1,-1
Parameters
DBConnector: An object representing the connector for the database that is responsible for executing queriesfileName: A string representing name of the file.fileType: A string representing type of the file.isChunked: A boolean representing whether the file is chunked or not.
ConfirmVerdict
This method retrieves the verdict and evaluates its confidence based upon the Confirmation Threshold. The verdict and the confirmation status are returned.
def ConfirmVerdict(Model_config, AIReport, VerdictConfidence):
""" To confirm the verdict from the model """
verdict=AIReport.getVerdict()
ConfirmationThreshold = Model_config['Thresholds'][verdict]
if(VerdictConfidence>=ConfirmationThreshold):
return "Confirmed", verdict
else:
return "Unconfirmed", verdict
Parameters
Model_config: A dictionary representing the configuration of the model.AIReport: An AIReport object representing the AI Report.VerdictConfidence: A float representing the confidence percentage of the verdict.
AnalyzeReport
This method analyzes the AI Report and updates the database accordingly along with handlng EPICS functionality if relevant.
# Extended code available on Github
def AnalyzeReport(DBConnector, Model_config, AIReport, outputlocation, RunPeriod, RunNumber_padding, RunNumber, reportMetaData, beam_current_name, beam_current_threshold, epics_root)
Parameters
DBConnector: An object representing the connector for the database that executes queries.Model_config: A dictionary containing the configuration for the model.AIReport: An AIReport object representing the AI report.outputlocation: A string representing the output location.RunPeriod: A string representing the run period.RunNumber_padding: An integer representing the padding for the run number.RunNumber: An integer representing the run number.reportMetaData: A dictionary representing the metadata of the AI report.beam_current_name: A string representing the name of the beam current.beam_current_threshold: A float representing the current beam threshold value.epics_root: A string representing the root for the EPICS library.
SetStore
This method uses the AI model to decide whether or not to keep the plot.
def SetStore(DBConnector, Plot_Type_ID,chunkNum,item,percent,RunPeriod,RunNumber_padding,RunNumber,outputlocation,test_mode):
""" To keep or remove the file """
print("Checking", Plot_Type_ID,"against", float(percent))
if(random.random()>=float(percent) or test_mode):
return
else:
already_exists_q="SELECT * FROM Plots where Plot_Types_ID="+str(Plot_Type_ID)+" && RunPeriod=\""+RunPeriod+"\" && RunNumber="+str(RunNumber)+" && Chunk="+str(chunkNum)
Existing_entry = DBConnector.FetchAll(already_exists_q)
if(len(Existing_entry)==0):
print("moving",item,"-------------->",outputlocation)
moveFile(outputlocation, RunPeriod, RunNumber_padding,RunNumber, item)
else:
print("already exists")
Parameters
DBConnector: An object representing the connector for the database that executes queries.Plot_Type_ID: An integer representing the ID of the type of plot.chunkNum: An integer representing the chunk number of the file.item: A string representing the file item.percent: A float representing the threshold percentage for the file being kept.RunNumber_padding: An integer representing the padding for the run number.RunNumber: An integer representing the run number.RunPeriod: A dictionary representing the metadata of the AI report.outputlocation: A string representing the name of the output location.test_mode: A boolean representing whether the script is active.
GetKeeperConfig
This method returns the configuration for the keeper from the database in the form of a dictionary.
def GetKeeperConfig(DBConnector):
""" To get the keeper config from the database """
json_dict={}
json_dict["Models"]={}
models_q="SELECT Distinct Model_ID from ModelThresholds order by Model_ID asc;"
models=DBConnector.FetchAll(models_q)
for m in models:
#print(m["Model_ID"])
json_dict["Models"][str(m["Model_ID"])]={}
main_q="SELECT Model_ID,labels,Classification,Threshold from ModelThresholds as mt inner join Plot_Classifications as pc on pc.ID=mt.Plot_Classification_ID inner join Models on Model_ID=Models.ID where Models.ID="+str(m["Model_ID"])+";"
thresholds=DBConnector.FetchAll(main_q)
Thresholds_dict={}
if(len(thresholds)>0):
original_dict_string=str(thresholds[0]["labels"],'utf-8')
original_dict = eval(original_dict_string)
labels = {v: k for k, v in original_dict.items()}
for t in thresholds:
Thresholds_dict[t["Classification"]]=t["Threshold"]
json_dict["Models"][str(m["Model_ID"])]["Labels"]=labels
json_dict["Models"][str(m["Model_ID"])]["Thresholds"]=Thresholds_dict
return json_dict
Parameters
DBConnector: An object representing the connector for the database that executes queries.
moveFile
This method is responsible for moving a given file from its input location to its output location.
def moveFile(outputlocation, RunPeriod,RunNumber_padding, RunNumber, item):
""" To move file from input location to outputlocation """
if(outputlocation!="NULL"):
os.makedirs(outputlocation+str(RunNumber).zfill(RunNumber_padding)+"/",exist_ok=True)
print("Copying %s to %s" % (item['inDATA'],outputlocation+str(RunNumber).zfill(RunNumber_padding)+"/"+item['inDATA'].split("/")[-1]))
copyfile(item['inDATA'],outputlocation+str(RunNumber).zfill(RunNumber_padding)+"/"+item['inDATA'].split("/")[-1])
else:
print("I should copy this file but don't know where to copy it to....please supply outputlocation via -ol")
Parameters
item: A string representing the file item.RunNumber_padding: An integer representing the padding for the run number.RunNumber: An integer representing the run number.RunPeriod: A dictionary representing the metadata of the AI report.outputlocation: A string representing the name of the output location.