Source code for km3dq_grl.fact-add

#! /usr/bin/env python
###############################################################################
import os
import re
import time
import sys
import getpass
import readline  # Mandatory for proper backspace handling. Do not remove

from km3dq_grl import fact_path

from km3dq_common.common_library import get_site
from km3dq_common.common_library import get_run_properties_from_db

from km3dq_common.config_library import configure_dataquality_tag
from km3dq_common.config_library import configure_fact


###############################################################################
[docs] def get_current_git_branch(): """ Return the current git branch""" g_branch = os.popen("git branch") for i_branch in g_branch: if "*" in i_branch: rsb = re.compile(r"\*|\s|\n") return rsb.sub("", i_branch) return "Unknown"
############################################################################### # User inputs # Detector affected answer = False while answer is False:
[docs] det = input("Detector ([A] / [O] for the current ARCA/ORCA detector): ")
if det == "A": det = "D0ARCA028" elif det == "O": det = "D1ORCA019" site = get_site(det) dq_tag = configure_dataquality_tag('default') if det not in dq_tag['det'][site]: print("Unknown detector!") else: answer = True # Fact type answer = False print("Fact type: ")
[docs] avail_answer = []
[docs] facts = configure_fact()
for i_fact in facts['type'].keys(): avail_answer.append(i_fact) for i_fact, fact_type in enumerate(avail_answer): print(f"{i_fact}. {fact_type}") while answer is False:
[docs] fact_type = input("")
try: fact_type_int = int(fact_type) fact_type = avail_answer[fact_type_int] answer = True except ValueError: print("Please enter an integer") except IndexError: print(f"Please enter an integer between 0 and {len(avail_answer)-1}") # Run start answer = False while answer is False: run = input("Run start: ") try: run_start = int(run) answer = True except ValueError: print("Please enter an integer") # Run stop answer = False while answer is False:
[docs] run = input("Run stop (if unknown or irrelevant, reply 0): ")
try: if int(run) > 0: run_stop = int(run) else: run_stop = "Unknown / irrelevant" answer = True except ValueError: print("Please enter an integer") # Date
[docs] rp_db = get_run_properties_from_db(det, "")
try:
[docs] date_start = int(rp_db[run_start]['UNIXSTARTTIME'])/1e3
date_start_strf = time.strftime("%a, %d %b %Y %H:%M", time.localtime(int(date_start))) time_range = (f"{date_start_strf} - ") except KeyError: print("Unable to retrieve the run start from the db") sys.exit() if run_stop != "Unknown / irrelevant": try:
[docs] date_stop = int(rp_db[run_stop]['UNIXSTARTTIME'])/1e3
date_stop_strf = time.strftime("%a, %d %b %Y %H:%M", time.localtime(date_stop)) time_range += date_stop_strf except KeyError: print("Unable to retrieve the run stop from the db") sys.exit() print(time_range) # Issue / comment answer = False while answer is False:
[docs] issue_comment = input("Issue / Comment: ")
if len(issue_comment) < 10: print("Entry too short... Please add more details") else: answer = True # Documentation
[docs] documentation = input("Documentation (elog, gitlab issue...): ")
# Author answer = False
[docs] whoami = getpass.getuser()
while answer is False:
[docs] author = input(f"Author (default: {whoami}): ")
if author == "": author = whoami date = time.strftime("%d/%m/%y", time.localtime()) author += f" ({date})" answer = True ############################################################################### # Treatment if user inputs # Create the Facts directory if not yet existing if os.path.exists(f"{site}/{det}/Facts/") is False: os.popen(f"mkdir {site}/{det}/Facts/") # New entries (one per run)
[docs] fact_file = fact_path(fact_type, det)
[docs] new_entry = (f"{run_start} | " f"{run_stop} | " f"{time_range} | " f"{issue_comment} | " f"{documentation} | " f"{author}\n")
# Git treatment # If the current branch is main, propose to switch to new created branch # Otherwise, leave the choice.
[docs] cur_git_branch = get_current_git_branch()
if cur_git_branch == "main": print("You are currently in main git branch. I will create a new one " "for your fact upload\n")
[docs] git_branch = cur_git_branch
new_branch = "y" else: print(f"You are currently in {cur_git_branch} branch. You can either " "keep on working on this branch for a bulk upload or create a " "new one") answer = False while answer is False: new_branch = input("Do you want to create a new branch?" "([y]es, [n]o):") if new_branch not in ("y", "n"): print("Type y or n") else: answer = True # Checks that the fact file exist and ask for a final confirmation if os.path.exists(fact_file) is False: print(f"Fact file {fact_file} does not exists." "I am creating it and adding it to git repository.") with open(fact_file, "w", encoding="utf-8") as new_file: new_file.write("Start date | " "First affected run | " "Last affected run | " "Issue / Comment | " "Documentation | " "Author\n") cmd = f"git add {fact_file}" os.popen(cmd) print(f"I am about to add the following fact for the {det} detector:\n" f"{new_entry}\n" f"Fact file: {fact_file}")
[docs] answer = False
while answer is False:
[docs] confirm = input("Please confirm by typing [YES] or [NO]:")
if confirm not in ("YES", "NO"): print("Type YES or NO") else: answer = True if confirm == "NO": sys.exit() # Create the new branch if requested if new_branch == "y":
[docs] branch_time = time.strftime("%d_%m_%y_%H_%M", time.localtime())
git_branch = f"main-fact-upload-{branch_time}" cmd = f"git checkout -b {git_branch}" os.popen(cmd) print(f"You are now working in {git_branch} branch.") else: git_branch = cur_git_branch # And finally update the file with open(fact_file, "r", encoding="utf-8") as orig: with open(f"{fact_file}_tmp", "w", encoding="utf-8") as fin:
[docs] lines = orig.readlines()
first_line = True for i_line in lines: fin.write(i_line) if first_line: # New entry is at the top of the fact file fin.write(new_entry) first_line = False cmd = f"mv {fact_file}_tmp {fact_file}" os.popen(cmd)
[docs] cmd = f"git commit {fact_file} -m '{det} fact added by {author}'"
os.popen(cmd) print("The fact upload has been commited in the new branch.\n" "You can add more facts in the same branch.\n\n" "When your modifications are complete, proceed with:\n" f"- git push --set-upstream origin {git_branch}\n" "- Request the merging", "Once the branch is merged, the fact page will be automatically " "updated\n") print("Do not forget to git fetch / git merge when you are back in " "the main branch")