Source code for km3dq_grl.defect-add

#! /usr/bin/env python
###############################################################################
import os
import re
import time
import sys
import getpass
import readline  # Mandatory for proper backspace handling. Do not remove

from km3dq_grl import defect_path

from km3dq_common.common_library import get_site
from km3dq_common.common_library import get_run_properties_from_qaqc

from km3dq_common.config_library import configure_dataquality_tag
from km3dq_common.config_library import configure_defect

###############################################################################
[docs] def get_current_git_branch(): """ Return the current git branch""" g_branch = os.popen("git branch") for i_branch in g_branch: if "*" in i_branch: rsb = re.compile(r"\*|\s|\n") return rsb.sub("", i_branch) return "Unknown"
###############################################################################
[docs] def return_affected_runs(input_run, dataset): # Single run r_test_single = re.compile(r"\s*([0-9]+)\s*") r_t = r_test_single.fullmatch(input_run) if r_t: run_list = [] run_list.append(int(r_t.group(1))) return run_list # Run list r_test_list = re.compile(r"[0-9]+\s*[, \s 0-9]*") r_t = r_test_list.fullmatch(input_run) if r_t: r_test_list2 = re.compile(r"[0-9]+") run_list = r_test_list2.findall(input_run) return run_list # Run range r_test_range = re.compile(r"\s*([0-9]+)\s*-\s*([0-9]+)\s*") r_t = r_test_range.fullmatch(input_run) if r_t: run1 = int(r_t.group(1)) run2 = int(r_t.group(2)) dq_tag_0 = configure_dataquality_tag('default') (run_qaqc, _) = get_run_properties_from_qaqc(dataset, dq_tag_0, "qaqc_sftp", run1, run2) run_list = run_qaqc['runNumber'] if len(run_list) > 0: print("I have found in the JQAQC file the following runs in the " f"run range {run1} - {run2}:") print(run_list) print("Some runs may be missing if not yet processed.") answer0 = False while answer0 is False: confirm0 = input("Please confirm by typing YES or NO:") if confirm0 not in ("YES", "NO"): print("Type YES or NO") else: answer0 = True if confirm0 == "YES": return run_list sys.exit() else: print("I have not found any run in the JQAQC file in the" f"run range {run1} - {run2}. Either there is no much PHYS" "in this range, either they are not yet processed.") # No pattern found ! return []
############################################################################### # User inputs # Detector affected answer = False while answer is False:
[docs] det = input("Detector ([A] / [O] for the current ARCA/ORCA detector): ")
if det == "A": det = "D0ARCA028" elif det == "O": det = "D1ORCA019" site = get_site(det) dq_tag = configure_dataquality_tag('default') if det not in dq_tag['det'][site]: print("Unknown detector!") else: answer = True # Defect type answer = False print("Defect type: ") avail_answer = []
[docs] defects = configure_defect()
for i_def in defects['bit'].keys(): avail_answer.append(i_def) for i_def, def_type in enumerate(avail_answer): print(f"{i_def}. {def_type}") while answer is False:
[docs] def_type = input("")
try: def_type_int = int(def_type) def_type = avail_answer[def_type_int] answer = True except ValueError: print("Please enter an integer") except IndexError: print(f"Please enter an integer between 0 and {len(avail_answer)-1}") # Defect description answer = False print("Defect description: ")
[docs] avail_answer = []
for i_def in defects['bit'][def_type].keys(): avail_answer.append(i_def) for i_def, def_descr in enumerate(avail_answer): print(f"{i_def}. {def_descr}") while answer is False:
[docs] def_descr = input("")
try: def_descr_int = int(def_descr) def_descr = avail_answer[def_descr_int] answer = True except ValueError: print("Please enter an integer") except IndexError: print(f"Please enter an integer between 0 and {len(avail_answer)-1}") # Defect tag def_tag = input("Defect tag (by default: def-HEAD): ") if def_tag == "":
[docs] def_tag = "def-HEAD"
# Run affected answer = False while answer is False:
[docs] run = input("Run, run list (separated by a coma)" " or run range (separated by a dash): ")
processed_runs = return_affected_runs(run, det) if len(processed_runs) == 0: print("No run found!") else: print(f"The {len(processed_runs)} following runs will be added:" f"{processed_runs}") answer = True # Issue / comment answer = False while answer is False:
[docs] issue_comment = input("Issue / Comment: ")
if len(issue_comment) < 10: print("Entry too short... Please add more details") else: answer = True # Documentation print("If you do not have any elog reference, you can try searching " "with the link(s):") for i_run in processed_runs: if "ARCA" in det: print(f"- https://elog.km3net.de/Operations+IT/?subtext={i_run}") else: print(f"- https://elog.km3net.de/Operations+FR/?subtext={i_run}")
[docs] documentation = input("Documentation (elog, gitlab issue...): ")
# Recoverability answer = False while answer is False:
[docs] recov = input("Recoverable? ([y]es, [n]o or [u]nknown): ")
if recov not in ("y", "n", "u"): print("Type y, n or u") else: if recov == "u": recov = "?" answer = True # Author answer = False
[docs] whoami = getpass.getuser()
while answer is False:
[docs] author = input(f"Author (default: {whoami}): ")
if author == "": author = whoami date = time.strftime("%d/%m/%y", time.localtime()) author += f" ({date})" answer = True ############################################################################### # Treatment if user inputs # New entries (one per run) # def_file = f"{site}/{det}/Defects/{def_type}_{def_descr}_{def_tag}.txt"
[docs] def_file = defect_path(def_type, def_descr, det, def_tag)
[docs] new_entry = ""
for i_run in processed_runs: new_entry += (f"{i_run} | " f"{issue_comment} | " f"{documentation} | " f"{recov} | " f"{author}\n") # Git treatment # If the current branch is main, propose to switch to new created branch # Otherwise, leave the choice.
[docs] cur_git_branch = get_current_git_branch()
if cur_git_branch == "master": print("You are currently in master git branch. I will create a new one " "for your defect upload\n")
[docs] git_branch = cur_git_branch
new_branch = "y" else: print(f"You are currently in {cur_git_branch} branch. You can either " "keep on working on this branch for a bulk upload or create a " "new one") answer = False while answer is False: new_branch = input("Do you want to create a new branch?" "([y]es, [n]o):") if new_branch not in ("y", "n"): print("Type y or n") else: answer = True # Checks that the defect file exist and ask for a final confirmation if os.path.exists(def_file) is False: print(f"Defect file {def_file} does not exists." "I am creating it and adding it to git repository.") with open(def_file, "w", encoding="utf-8") as new_file: new_file.write("Run Number | " "Issue / Comment | " "Documentation | " "Recov. | " "Author\n") cmd = f"git add {def_file}" os.popen(cmd) print(f"I am about to add the following defect for the {det} detector:\n" f"{new_entry}\n" f"Defect file: {def_file}")
[docs] answer = False
while answer is False:
[docs] confirm = input("Please confirm by typing [YES] or [NO]:")
if confirm not in ("YES", "NO"): print("Type YES or NO") else: answer = True if confirm == "NO": sys.exit() # Create the new branch if requested if new_branch == "y":
[docs] branch_time = time.strftime("%d_%m_%y_%H_%M", time.localtime())
git_branch = f"main-defect-upload-{branch_time}" cmd = f"git checkout -b {git_branch}" os.popen(cmd) print(f"You are now working in {git_branch} branch.") else: git_branch = cur_git_branch # And finally update the file with open(def_file, "r", encoding="utf-8") as orig: with open(f"{def_file}_tmp", "w", encoding="utf-8") as fin:
[docs] lines = orig.readlines()
first_line = True for i_line in lines: fin.write(i_line) if first_line: # New entry is at the top of the defect file fin.write(new_entry) first_line = False cmd = f"mv {def_file}_tmp {def_file}" os.popen(cmd)
[docs] cmd = f"git commit {def_file} -m '{det} defect added by {author}'"
os.popen(cmd) print("The defect upload has been commited in the new branch.\n" "You can add more defects in the same branch.\n\n" "When your modifications are complete, proceed with:\n" f"- git push --set-upstream origin {git_branch}\n" "- Request the merging", "Once the branch is merged, the defect page will be automatically " "updated\n") print("Do not forget to git fetch / git merge when you are back in " "the main branch")