Designed and developed an end-to-end automated ETL pipeline to process unstructured student certificate data (Images & PDFs) into a structured relational database, featuring a custom Human-in-the-Loop (HITL) system for 100% accuracy.
Reduction in manual data entry time
Data accuracy via HITL Quality Control
Automated batch processing architecture
difflib for fuzzy matching to perfectly map misspelled names to official database IDs.Phase 1 is where the system automatically logs into the portal, accesses the ticketing system, and gathers all necessary files (Drive links, PDFs, Images). It then uses Google Gemini AI to analyze the visual certificates and extract structured JSON data according to strict rules.
Phase 2 acts as a Quality Control checkpoint to ensure 100% data accuracy. The system pushes the AI-extracted data to a temporary Google Sheet, pausing the script to allow a human reviewer to audit and edit any incorrect fields before proceeding.
In Phase 3, the system automatically inputs the finalized data into the web portal. It uses Selenium to dynamically fill out the student name, event type, event description, exact event date/time, and the awarded event points.
Used Python Regex and Datetime parsing to handle highly unstructured anomalies outputted by AI, enforcing strict schema compliance.
Recognized AI hallucination risks and proactively architected a REST API bridge to Google Sheets for human-level auditing (HITL).
Engineered a robust API key rotation system and exponential backoff to handle HTTP 429/500 errors, ensuring batches never crash midway.
Constructed strict zero-shot prompts to guide multimodal models (Gemini) into parsing spatial layout documents into JSON schema.
import time, json, requests, difflib, os, re
from datetime import datetime
import google.generativeai as genai
from PIL import Image
import pillow_heif
import fitz # PyMuPDF for reading PDFs
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
# ==========================================
# 1. SETUP & CONFIGURATION
# ==========================================
pillow_heif.register_heif_opener()
# š INSERT ALL YOUR API KEYS HERE
API_KEYS = [
# "AIzaSyB...",
# Add as many as you want
]
# š INSERT FINAL RECAP GOOGLE SHEET URL
GOOGLE_SHEET_URL = "YOUR_FINAL_RECAP_SHEET_URL"
# š INSERT TEMPORARY QC GOOGLE SHEET URL
QC_SHEET_URL = "YOUR_QC_SHEET_URL"
# š INSERT ALL TICKET IDs HERE
TICKET_IDS = [5799, 5801]
# ==========================================
# 2. COMPLETE STUDENT DATABASE
# ==========================================
student_map = {
# "Student Name": "Student_ID"
}
def find_best_student_match(extracted_name, db):
target = extracted_name.lower().replace(".", "").strip()
matched_id, best_full_name = None, None
highest_score = 0
for full_name, s_id in db.items():
clean_db_name = full_name.lower().replace(".", "").strip()
if target in clean_db_name:
return s_id, full_name
score = difflib.SequenceMatcher(None, target, clean_db_name).ratio()
if target.split()[0] == clean_db_name.split()[0]:
score += 0.25
if score > highest_score:
highest_score = score
matched_id, best_full_name = s_id, full_name
return (matched_id, best_full_name) if highest_score > 0.55 else (None, None)
# ==========================================
# 4. MAIN AUTOMATION BOT FUNCTION
# ==========================================
def run_automation():
current_key_index = 0
if not API_KEYS or API_KEYS[0] == "":
print("ā PLEASE INSERT AT LEAST 1 API KEY AT THE TOP!")
return
genai.configure(api_key=API_KEYS[current_key_index])
options = webdriver.ChromeOptions()
options.add_argument("--start-maximized")
options.binary_location = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
driver = webdriver.Chrome(options=options)
wait = WebDriverWait(driver, 30)
def get_genai_model():
available_models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods]
return genai.GenerativeModel(next((m for m in available_models if 'flash' in m), available_models[0]))
model = get_genai_model()
prompt = """
Analyze this image carefully. It might contain ONE certificate, MULTIPLE certificates (like a collage), or NO certificates at all (e.g., a selfie or blank page).
You MUST return a strictly formatted JSON ARRAY `[]` containing objects `{}`.
Rule 1: If the image does NOT contain ANY certificates at all, return exactly this:
[
{
"status": "INVALID"
}
]
Rule 2: If the image contains ONE OR MORE certificates, extract the details for EACH certificate found and return them as separate objects in the array.
Example for an image with 2 certificates:
[
{
"status": "VALID",
"name": "Full Student Name 1",
"category": "STEAM Exposure",
"title": "Clean Event Title 1 Only",
"desc": "Short description 1",
"date": "YYYY-MM-DD",
"time": "HH:MM:SS"
}
]
Data Extraction Rules:
- 'title' should be the actual competition/event name.
- 'category' should be 'STEAM Exposure' for math/science/competitions, or 'Global Insight' for language/culture.
- For 'date': Format strictly as YYYY-MM-DD. IF NO DATE IS FOUND, return "NO_DATE".
- For 'time': Format strictly as HH:MM:SS (24-hour format). IF NO TIME IS FOUND, return "NO_TIME".
"""
pending_entries = []
skipped_records = []
successful_tickets = []
quota_exceeded = False
try:
print("š Logging into the system...")
driver.get("https://your-school-portal.com/login")
driver.find_element(By.NAME, "email").send_keys("admin@your-school-portal.com")
driver.find_element(By.NAME, "password").send_keys("YOUR_PASSWORD")
driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
time.sleep(3)
# ======================================================================
# PHASE 1: DATA EXTRACTION OF ALL TICKETS (AI READING IMAGES)
# ======================================================================
print("\n" + "="*50)
print("š PHASE 1: STARTING DATA EXTRACTION FOR ALL TICKETS")
print("="*50)
for ticket_id in TICKET_IDS:
if quota_exceeded: break
ticket_url = f"https://your-school-portal.com/ticketing/view/{ticket_id}"
driver.get(ticket_url)
time.sleep(3)
requestor_name, requestor_category = "Unknown", "Unknown"
try:
req_p_element = driver.find_element(By.CSS_SELECTOR, "div.requester p")
req_badge = req_p_element.find_element(By.CSS_SELECTOR, "span.badge").text
requestor_category = req_badge.strip().capitalize()
raw_text = req_p_element.text
requestor_name = raw_text.split("(")[0].replace('"', '').strip()
except Exception: pass
try:
attachments = WebDriverWait(driver, 3).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".attachments-list li a"))
)
except TimeoutException: attachments = []
page_text = driver.page_source
gdrive_links = list(set(re.findall(r'(https?://drive\.google\.com/(?:file/d/|open\?id=|view|document/d/)[^\s"\'<>]+)', page_text)))
attachment_urls = []
for a in attachments:
url = a.get_attribute("href")
if "drive.google.com" in url and url not in gdrive_links: gdrive_links.append(url)
elif "drive.google.com" not in url: attachment_urls.append(url)
total_items = len(attachment_urls) + len(gdrive_links)
if total_items == 0:
print(f"ā ļø Ticket {ticket_id} skipped: No attachments.")
skipped_records.append({'id': ticket_id, 'reason': 'No attachments'})
continue
saved_files = []
cookies = {c['name']: c['value'] for c in driver.get_cookies()}
for idx, g_link in enumerate(gdrive_links):
final_filename = f"cert_ticket_{ticket_id}_G{idx}.png"
driver.execute_script("window.open(arguments[0], '_blank');", g_link)
driver.switch_to.window(driver.window_handles[-1])
time.sleep(8)
current_page = driver.page_source.lower()
if "accounts.google.com" in driver.current_url or "minta akses" in current_page or "request access" in current_page:
print(f"š Ticket {ticket_id}: GDrive Access Locked!")
skipped_records.append({'id': f"{ticket_id} (Link {idx+1})", 'reason': 'GDrive Link Locked'})
else:
driver.save_screenshot(final_filename)
saved_files.append(final_filename)
driver.close()
driver.switch_to.window(driver.window_handles[0])
for idx, file_url in enumerate(attachment_urls):
try:
time.sleep(1)
file_data = requests.get(file_url, cookies=cookies).content
temp_filename = f"cert_ticket_{ticket_id}_F{idx}_temp"
final_filename = f"cert_ticket_{ticket_id}_F{idx}.png"
with open(temp_filename, "wb") as f: f.write(file_data)
if file_data.startswith(b'%PDF'):
doc = fitz.open(temp_filename)
page = doc.load_page(0)
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
pix.save(final_filename)
saved_files.append(final_filename)
else:
with Image.open(temp_filename) as img:
converted_img = img.convert("RGB")
converted_img.save(final_filename, "PNG")
saved_files.append(final_filename)
os.remove(temp_filename)
except Exception: pass
if len(saved_files) == 0:
if not any(item['id'] == ticket_id for item in skipped_records):
skipped_records.append({'id': ticket_id, 'reason': 'Failed to download all attachments'})
continue
for file_idx, file in enumerate(saved_files):
print(f"š§ Processing Ticket {ticket_id} (File {file_idx + 1})...")
ai_success = False
certificates_found = []
retry_429 = 0
while True:
try:
time.sleep(4)
response = model.generate_content([prompt, Image.open(file)])
raw_text = response.text.replace("```json", "").replace("```", "").strip()
match = re.search(r'\[.*\]', raw_text, re.DOTALL)
if match: certificates_found = json.loads(match.group(0))
else: certificates_found = [json.loads(raw_text)]
ai_success = True
break
except Exception as e:
error_msg = str(e)
if "429" in error_msg:
retry_429 += 1
if retry_429 > 1:
print(f"ā ļø API Key {current_key_index + 1} Limit Reached. Switching Key...")
current_key_index += 1
if current_key_index < len(API_KEYS):
genai.configure(api_key=API_KEYS[current_key_index])
model = get_genai_model()
retry_429 = 0
time.sleep(3)
continue
else:
quota_exceeded = True
break
else:
time.sleep(60)
continue
elif "504" in error_msg or "500" in error_msg or "503" in error_msg:
skipped_records.append({'id': f"{ticket_id} (File {file_idx+1})", 'reason': 'Image Too Large (AI Timeout)'})
break
else: break
try: os.remove(file)
except: pass
if quota_exceeded or not ai_success: break
for cert_idx, cert_data in enumerate(certificates_found):
if cert_data.get('status') == "INVALID" or cert_data.get('name') == "NO_CERTIFICATE":
skipped_records.append({'id': f"{ticket_id} (File {file_idx+1})", 'reason': 'Not a certificate image'})
continue
student_name_ai = cert_data.get("name", "Unknown")
s_id, db_name = find_best_student_match(student_name_ai, student_map)
final_student_name = db_name if s_id else f"{student_name_ai} (Not in DB)"
raw_date = cert_data.get('date', 'NO_DATE')
if raw_date == "NO_DATE" or raw_date == "" or not raw_date:
raw_date = datetime.now().strftime("%Y-%m-%d")
raw_time = cert_data.get('time', 'NO_TIME')
if raw_time == "NO_TIME" or raw_time == "" or not raw_time:
raw_time = "09:00:00"
pending_entries.append({
"ticket_id": ticket_id,
"file_idx": file_idx + 1,
"cert_idx": cert_idx + 1,
"s_id": s_id,
"final_student_name": final_student_name,
"category_ai": cert_data.get("category", "STEAM Exposure"),
"title_ai": cert_data.get("title", "Untitled Event"),
"desc_ai": cert_data.get("desc", ""),
"poin_ai": "5",
"raw_date": raw_date,
"raw_time": raw_time,
"requestor_name": requestor_name,
"requestor_category": requestor_category,
"total_items": total_items,
"len_certificates": len(certificates_found)
})
# ======================================================================
# PHASE 2: SYNCHRONIZATION TO GOOGLE SHEET FOR MANUAL QC
# ======================================================================
if len(pending_entries) > 0 and not quota_exceeded:
print("\n\n" + "="*70)
print("š¤ PHASE 2: SENDING TEMPORARY DATA TO QC GOOGLE SHEET")
print("="*70)
try:
print("ā³ Sending data to QC Sheet...")
requests.post(QC_SHEET_URL, json={"action": "write", "data": pending_entries})
print(f"ā
Data successfully sent! Please open your QC Google Sheet.")
print("š Time column is now visible and editable (Default: 09:00:00).")
print(" (You can hide columns J to P to keep it tidy)")
input("\nš ļø PRESS [ENTER] HERE WHEN YOU ARE DONE EDITING IN GOOGLE SHEET... ")
print("š„ Pulling back edited data from Google Sheet...")
response = requests.get(f"{QC_SHEET_URL}?action=read")
qc_data = response.json()
if "data" in qc_data:
pending_entries = qc_data["data"]
print(f"ā
Finished pulling {len(pending_entries)} QC-ed data entries!")
else:
print("ā ļø Failed to format QC data. Continuing with original AI data.")
except Exception as e:
print(f"ā Error during QC Google Sheet synchronization: {e}")
print("ā ļø Program continuing with original data (before QC).")
# ======================================================================
# PHASE 3: DATA INPUT TO WEBSITE DATABASE (FULLY AUTOMATED)
# ======================================================================
if len(pending_entries) > 0 and not quota_exceeded:
print("\n" + "="*50)
print("š PHASE 3: STARTING DATA INPUT TO WEB DATABASE")
print("="*50)
for entry in pending_entries:
try:
driver.get("https://your-school-portal.com/data-entry-url")
time.sleep(3)
if entry['s_id']:
driver.execute_script("$('#student_id').val(arguments[0]).trigger('change');", entry['s_id'])
Select(driver.find_element(By.NAME, "event_type")).select_by_visible_text(entry['category_ai'])
driver.find_element(By.NAME, "event_title").send_keys(entry['title_ai'])
desc = entry.get('desc_ai', '').strip()
if not desc or desc == "None":
desc = f"Certificate for the event {entry.get('title_ai', 'this')}"
driver.find_element(By.NAME, "event_description").send_keys(desc)
driver.find_element(By.NAME, "point").send_keys(str(entry['poin_ai']))
# --- IMPORTANT FIX: COMBINED DATE AND TIME (DateTime) ---
# 1. Clean Date Format (Must be YYYY-MM-DD)
raw_date = str(entry.get('raw_date', '')).strip()[:10]
if not re.match(r"^\d{4}-\d{2}-\d{2}$", raw_date):
raw_date = datetime.now().strftime("%Y-%m-%d")
# 2. Clean Time Format (Must be HH:MM:SS)
raw_time = str(entry.get('raw_time', '')).strip()
if len(raw_time) == 5: # If sheet only sends "14:30"
raw_time += ":00" # Make it "14:30:00"
if not re.match(r"^\d{2}:\d{2}:\d{2}$", raw_time):
raw_time = "09:00:00" # Revert to default if format is completely broken
# 3. Combine exactly as requested: "YYYY-MM-DDTHH:MM:SS"
EVENT_DATE = f"{raw_date}T{raw_time}"
# 4. Inject to website via JavaScript + Trigger Event
driver.execute_script(f"""
var dateField = document.getElementById('date') || document.querySelector('input[type="datetime-local"]');
if (dateField) {{
dateField.value = '{EVENT_DATE}';
dateField.dispatchEvent(new Event('input', {{ bubbles: true }}));
dateField.dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
""")
# ----------------------------------------------------
print(f"š¾ Saving data for {entry['final_student_name']} (Ticket {entry['ticket_id']})...")
driver.find_element(By.NAME, "publish").click()
time.sleep(4)
cert_identifier = str(entry['ticket_id'])
if int(entry['total_items']) > 1 or int(entry['len_certificates']) > 1:
cert_identifier += f" (C-{entry['file_idx']}.{entry['cert_idx']})"
# RECAP TO MAIN GOOGLE SHEET (FINAL)
try:
payload = {
"recap_type": "Success",
"id": cert_identifier,
"name": entry['final_student_name'],
"req_name": entry['requestor_name'],
"req_category": entry['requestor_category'],
"category": entry['category_ai']
}
requests.post(GOOGLE_SHEET_URL, json=payload)
except: pass
successful_tickets.append(payload)
except Exception as form_error:
print(f"ā Error filling form for ticket {entry['ticket_id']}: {form_error}")
skipped_records.append({'id': entry['ticket_id'], 'reason': 'Error filling Web form'})
# ==========================================
# FINAL RESULTS & COMPLETE RECAP
# ==========================================
if quota_exceeded:
print("\nšØ PROCESS STOPPED BECAUSE ALL API KEYS HAVE REACHED THEIR LIMITS! šØ")
else:
print("\nšš MISSION TOTALLY ACCOMPLISHED! All data has been executed. šš")
# SUCCESS RECAP
print("\n===========================================================================================================")
print("ā
RECAP 1: LIST OF TICKETS SUCCESSFULLY INPUTTED TO DATABASE")
print("===========================================================================================================")
print(f"{'TICKET ID':<16} | {'STUDENT NAME':<30} | {'REQ CATEGORY':<15} | {'EVENT CATEGORY'}")
print("-" * 90)
if len(successful_tickets) > 0:
for t in successful_tickets:
student_disp = t['name'][:27] + "..." if len(t['name']) > 30 else t['name']
print(f"{t['id']:<16} | {student_disp:<30} | {t['req_category']:<15} | {t['category']}")
else: print("No tickets were successfully inputted.")
print("================================================================================\n")
time.sleep(10)
except Exception as e:
print(f"ā Fatal error occurred in the main system: {e}")
finally:
driver.quit()
if __name__ == "__main__":
run_automation()