#! /usr/bin/env python
###############################################################################
import os
import re
import time
import sys
import getpass
import readline # Mandatory for proper backspace handling. Do not remove
from km3dq_grl import defect_path
from km3dq_common.common_library import get_site
from km3dq_common.common_library import get_run_properties_from_qaqc
from km3dq_common.config_library import configure_dataquality_tag
from km3dq_common.config_library import configure_defect
###############################################################################
[docs]
def get_current_git_branch():
""" Return the current git branch"""
g_branch = os.popen("git branch")
for i_branch in g_branch:
if "*" in i_branch:
rsb = re.compile(r"\*|\s|\n")
return rsb.sub("", i_branch)
return "Unknown"
###############################################################################
[docs]
def return_affected_runs(input_run, dataset):
# Single run
r_test_single = re.compile(r"\s*([0-9]+)\s*")
r_t = r_test_single.fullmatch(input_run)
if r_t:
run_list = []
run_list.append(int(r_t.group(1)))
return run_list
# Run list
r_test_list = re.compile(r"[0-9]+\s*[, \s 0-9]*")
r_t = r_test_list.fullmatch(input_run)
if r_t:
r_test_list2 = re.compile(r"[0-9]+")
run_list = r_test_list2.findall(input_run)
return run_list
# Run range
r_test_range = re.compile(r"\s*([0-9]+)\s*-\s*([0-9]+)\s*")
r_t = r_test_range.fullmatch(input_run)
if r_t:
run1 = int(r_t.group(1))
run2 = int(r_t.group(2))
dq_tag_0 = configure_dataquality_tag('default')
(run_qaqc, _) = get_run_properties_from_qaqc(dataset,
dq_tag_0,
"qaqc_sftp",
run1, run2)
run_list = run_qaqc['runNumber']
if len(run_list) > 0:
print("I have found in the JQAQC file the following runs in the "
f"run range {run1} - {run2}:")
print(run_list)
print("Some runs may be missing if not yet processed.")
answer0 = False
while answer0 is False:
confirm0 = input("Please confirm by typing YES or NO:")
if confirm0 not in ("YES", "NO"):
print("Type YES or NO")
else:
answer0 = True
if confirm0 == "YES":
return run_list
sys.exit()
else:
print("I have not found any run in the JQAQC file in the"
f"run range {run1} - {run2}. Either there is no much PHYS"
"in this range, either they are not yet processed.")
# No pattern found !
return []
###############################################################################
# User inputs
# Detector affected
answer = False
while answer is False:
[docs]
det = input("Detector ([A] / [O] for the current ARCA/ORCA detector): ")
if det == "A":
det = "D0ARCA028"
elif det == "O":
det = "D1ORCA019"
site = get_site(det)
dq_tag = configure_dataquality_tag('default')
if det not in dq_tag['det'][site]:
print("Unknown detector!")
else:
answer = True
# Defect type
answer = False
print("Defect type: ")
avail_answer = []
[docs]
defects = configure_defect()
for i_def in defects['bit'].keys():
avail_answer.append(i_def)
for i_def, def_type in enumerate(avail_answer):
print(f"{i_def}. {def_type}")
while answer is False:
try:
def_type_int = int(def_type)
def_type = avail_answer[def_type_int]
answer = True
except ValueError:
print("Please enter an integer")
except IndexError:
print(f"Please enter an integer between 0 and {len(avail_answer)-1}")
# Defect description
answer = False
print("Defect description: ")
for i_def in defects['bit'][def_type].keys():
avail_answer.append(i_def)
for i_def, def_descr in enumerate(avail_answer):
print(f"{i_def}. {def_descr}")
while answer is False:
try:
def_descr_int = int(def_descr)
def_descr = avail_answer[def_descr_int]
answer = True
except ValueError:
print("Please enter an integer")
except IndexError:
print(f"Please enter an integer between 0 and {len(avail_answer)-1}")
# Defect tag
def_tag = input("Defect tag (by default: def-HEAD): ")
if def_tag == "":
# Run affected
answer = False
while answer is False:
[docs]
run = input("Run, run list (separated by a coma)"
" or run range (separated by a dash): ")
processed_runs = return_affected_runs(run, det)
if len(processed_runs) == 0:
print("No run found!")
else:
print(f"The {len(processed_runs)} following runs will be added:"
f"{processed_runs}")
answer = True
# Issue / comment
answer = False
while answer is False:
if len(issue_comment) < 10:
print("Entry too short... Please add more details")
else:
answer = True
# Documentation
print("If you do not have any elog reference, you can try searching "
"with the link(s):")
for i_run in processed_runs:
if "ARCA" in det:
print(f"- https://elog.km3net.de/Operations+IT/?subtext={i_run}")
else:
print(f"- https://elog.km3net.de/Operations+FR/?subtext={i_run}")
[docs]
documentation = input("Documentation (elog, gitlab issue...): ")
# Recoverability
answer = False
while answer is False:
[docs]
recov = input("Recoverable? ([y]es, [n]o or [u]nknown): ")
if recov not in ("y", "n", "u"):
print("Type y, n or u")
else:
if recov == "u":
recov = "?"
answer = True
# Author
answer = False
[docs]
whoami = getpass.getuser()
while answer is False:
[docs]
author = input(f"Author (default: {whoami}): ")
if author == "":
author = whoami
date = time.strftime("%d/%m/%y", time.localtime())
author += f" ({date})"
answer = True
###############################################################################
# Treatment if user inputs
# New entries (one per run)
# def_file = f"{site}/{det}/Defects/{def_type}_{def_descr}_{def_tag}.txt"
[docs]
def_file = defect_path(def_type, def_descr, det, def_tag)
for i_run in processed_runs:
new_entry += (f"{i_run} | "
f"{issue_comment} | "
f"{documentation} | "
f"{recov} | "
f"{author}\n")
# Git treatment
# If the current branch is main, propose to switch to new created branch
# Otherwise, leave the choice.
[docs]
cur_git_branch = get_current_git_branch()
if cur_git_branch == "master":
print("You are currently in master git branch. I will create a new one "
"for your defect upload\n")
[docs]
git_branch = cur_git_branch
new_branch = "y"
else:
print(f"You are currently in {cur_git_branch} branch. You can either "
"keep on working on this branch for a bulk upload or create a "
"new one")
answer = False
while answer is False:
new_branch = input("Do you want to create a new branch?"
"([y]es, [n]o):")
if new_branch not in ("y", "n"):
print("Type y or n")
else:
answer = True
# Checks that the defect file exist and ask for a final confirmation
if os.path.exists(def_file) is False:
print(f"Defect file {def_file} does not exists."
"I am creating it and adding it to git repository.")
with open(def_file, "w", encoding="utf-8") as new_file:
new_file.write("Run Number | "
"Issue / Comment | "
"Documentation | "
"Recov. | "
"Author\n")
cmd = f"git add {def_file}"
os.popen(cmd)
print(f"I am about to add the following defect for the {det} detector:\n"
f"{new_entry}\n"
f"Defect file: {def_file}")
while answer is False:
[docs]
confirm = input("Please confirm by typing [YES] or [NO]:")
if confirm not in ("YES", "NO"):
print("Type YES or NO")
else:
answer = True
if confirm == "NO":
sys.exit()
# Create the new branch if requested
if new_branch == "y":
[docs]
branch_time = time.strftime("%d_%m_%y_%H_%M", time.localtime())
git_branch = f"main-defect-upload-{branch_time}"
cmd = f"git checkout -b {git_branch}"
os.popen(cmd)
print(f"You are now working in {git_branch} branch.")
else:
git_branch = cur_git_branch
# And finally update the file
with open(def_file, "r", encoding="utf-8") as orig:
with open(f"{def_file}_tmp", "w", encoding="utf-8") as fin:
[docs]
lines = orig.readlines()
first_line = True
for i_line in lines:
fin.write(i_line)
if first_line: # New entry is at the top of the defect file
fin.write(new_entry)
first_line = False
cmd = f"mv {def_file}_tmp {def_file}"
os.popen(cmd)
[docs]
cmd = f"git commit {def_file} -m '{det} defect added by {author}'"
os.popen(cmd)
print("The defect upload has been commited in the new branch.\n"
"You can add more defects in the same branch.\n\n"
"When your modifications are complete, proceed with:\n"
f"- git push --set-upstream origin {git_branch}\n"
"- Request the merging",
"Once the branch is merged, the defect page will be automatically "
"updated\n")
print("Do not forget to git fetch / git merge when you are back in "
"the main branch")