MCP Tutorial – 12 Modules
Latest Edition for AU-CSE Final year Students
Latest Edition for AU-CSE Final year Students
In this final module, we will bring together all the components created in earlier modules—department setup, course management, semester generation, subject allocation, staff management, timetable creation, attendance API, logs, dashboards, and analytics—into a unified Attendance Automation Platform powered by the Model Context Protocol.
This server loads every tool module and exposes them under a unified namespace.
from mcp import MCPServer
# Import All Tool Modules
import department
import courses
import semesters
import subjects
import staff
import timetable
import attendance
import analytics
server = MCPServer(title="University Attendance Automation")
# Register Tools From All Modules
server.load_tools_from(department)
server.load_tools_from(courses)
server.load_tools_from(semesters)
server.load_tools_from(subjects)
server.load_tools_from(staff)
server.load_tools_from(timetable)
server.load_tools_from(attendance)
server.load_tools_from(analytics)
if __name__ == "__main__":
server.start()
Below is the main dashboard HTML that links all 12 modules.
University Attendance Portal
The following JavaScript connects all UI buttons to respective MCP tools.
async function callTool(name, params = {}) {
return await mcp.call(name, params);
}
async function loadDept() {
const data = await callTool("list_departments");
console.log(data);
}
async function loadCourse() {
const data = await callTool("list_courses");
console.log(data);
}
async function loadSemester() {
const data = await callTool("generate_semesters", { course_id: 1 });
console.log(data);
}
Using MCP automation:
Deploy the complete MCP system using the script below.
#!/bin/bash
echo "Starting MCP University Attendance Server"
python3 mcp_master_server.py
This module focuses on adding analytics and dashboards inside the Attendance Tracking System using Model Context Protocol (MCP). We will add department-level statistics, staff performance summaries, student attendance insights, and semester‑wise overview screens.
Create a new MCP tool file analytics.py:
from mcp import register_tool
from database import db
@register_tool
async def department_attendance_summary(dept_id: int):
result = db.fetch("""
SELECT semester, AVG(attendance) AS avg_attendance
FROM attendance_summary
WHERE dept_id = ?
GROUP BY semester;
""", (dept_id,))
return result
@register_tool
async def staff_teaching_load(staff_id: int):
result = db.fetch("""
SELECT subject_name, total_hours
FROM staff_load
WHERE staff_id = ?;
""", (staff_id,))
return result
Department Dashboard
async function loadDashboard() {
const semSummary = await mcp.call("department_attendance_summary", { dept_id: 3 });
const staff = await mcp.call("staff_teaching_load", { staff_id: 12 });
document.getElementById("sem-summary").innerHTML = JSON.stringify(semSummary, null, 2);
document.getElementById("staff-load").innerHTML = JSON.stringify(staff, null, 2);
}
loadDashboard();
const ctx = document.getElementById('attendanceChart').getContext('2d');
new Chart(ctx, {
type: 'bar',
data: {
labels: semesters,
datasets: [{ label: 'Attendance %', data: values }]
}
});
Assume:
MCP analytics allow department head to check:
This module covers safe export formats, scheduled backups (pg_dump), uploads to object storage, and restore procedures for the MCP Attendance System.
Return CSV for month/semester reports. Streaming response avoids memory spikes.
# app/export.py
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
import csv, io
router = APIRouter()
@router.get('/reports/monthly/csv')
async def export_monthly_csv(year: int, month: int, semester: int, db=Depends(get_db)):
start = f"{year}-{month:02d}-01"
end = f"{year}-{month:02d}-31"
query = """
SELECT st.student_code, st.name,
(COUNT(*) FILTER (WHERE a.status='present')::float / NULLIF(COUNT(*),0) * 100) AS attendance_pct
FROM attendance a
JOIN students st ON st.id = a.student_id
WHERE a.date BETWEEN :start AND :end AND st.semester = :semester
GROUP BY st.id, st.student_code, st.name
ORDER BY attendance_pct DESC;
"""
rows = await db.fetch_all(query, values={'start':start,'end':end,'semester':semester})
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(['student_code','name','attendance_pct'])
for r in rows:
writer.writerow([r['student_code'], r['name'], r['attendance_pct']])
buf.seek(0)
return StreamingResponse(iter([buf.read()]), media_type='text/csv')
Simple daily backup with retention rotation.
#!/bin/bash
# /usr/local/bin/backup_mcp.sh
BACKUP_DIR=/backups/mcp
mkdir -p "$BACKUP_DIR"
FNAME="$BACKUP_DIR/mcp_db_$(date +%F).sql.gz"
pg_dump -U postgres mcp_db | gzip > "$FNAME"
# keep 14 days
find "$BACKUP_DIR" -type f -mtime +14 -name 'mcp_db_*.sql.gz' -delete
# optional: upload to S3
/usr/local/bin/aws s3 cp "$FNAME" s3://my-mcp-backups/ --storage-class STANDARD_IA
Cron entry (run daily at 02:00): 0 2 * * * /usr/local/bin/backup_mcp.sh
This script uploads the latest backup to S3 and tags it with the date & env.
# upload_backup.py
import boto3
from pathlib import Path
s3 = boto3.client('s3')
BUCKET = 'my-mcp-backups'
def upload(path: str):
p = Path(path)
key = f"backups/{p.name}"
s3.upload_file(str(p), BUCKET, key, ExtraArgs={'StorageClass':'STANDARD_IA'})
print('Uploaded', key)
if __name__ == '__main__':
import sys
upload(sys.argv[1])
Restore full DB from a gzipped dump.
# restore_mcp.sh
set -e
FILE=$1
if [ -z "$FILE" ]; then
echo "Usage: $0 backupfile.sql.gz"
exit 1
fi
gunzip -c "$FILE" | psql -U postgres -d mcp_db
Enable WAL archiving and continuous backup for critical systems. Configure postgresql.conf:
# postgresql.conf
archive_mode = on
archive_command = 'test ! -f /var/lib/postgresql/wal_archive/%f && cp %p /var/lib/postgresql/wal_archive/%f'
wal_level = replica
Use Alembic for schema migrations to keep environments in sync.
# Install
pip install alembic
alembic init alembic
# then configure alembic.ini to point to DATABASE_URL
# create migration after model changes
alembic revision --autogenerate -m "add attendance_audit"
alembic upgrade head
You can run pg_dump from a temporary container targeting the DB service.
# run from host to dump the running postgres container
docker run --rm --network $(docker-compose -f docker-compose.yml ps -q db | xargs docker inspect --format '{{range .NetworkSettings.Networks}}{{.NetworkID}}{{end}}') \
-v $(pwd)/backups:/backups postgres:15 bash -c "pg_dump -h db -U postgres mcp_db | gzip > /backups/mcp_db_$(date +%F).sql.gz"
This module implements validation and business rules to ensure data integrity and correct workflow behavior in the Attendance System.
# file: mcp_server/validators.py
async def staff_assigned(db, staff_id: int, subject_id: int) -> bool:
q = "SELECT 1 FROM staff_subjects WHERE staff_id = :sid AND subject_id = :sub"
row = await db.fetch_one(q, values={'sid': staff_id, 'sub': subject_id})
return bool(row)
# Usage inside FastAPI route
if not await staff_assigned(db, staff_id, subject_id):
raise HTTPException(status_code=403, detail='Staff not assigned to this subject')
-- Verify a timetable slot exists for semester/day/hour/subject
SELECT 1 FROM timetable_slots
WHERE semester_id = :semester_id AND day_of_week = :day AND slot_order = :hour AND subject_id = :subject_id;
# Python helper
async def student_in_semester(db, student_id: int, semester_id: int) -> bool:
q = 'SELECT 1 FROM students WHERE id = :sid AND semester = :sem'
return bool(await db.fetch_one(q, values={'sid': student_id, 'sem': semester_id}))
# Raise error if not
if not await student_in_semester(db, student_id, semester_id):
raise HTTPException(400, 'Student not enrolled in this semester')
-- Check if attendance row already exists for that student/session/hour
SELECT 1 FROM attendance
WHERE subject_id = :subject_id AND student_id = :student_id AND date = :date AND hour = :hour;
Only allow marking attendance within a sensible window (e.g., same day ±1 day) unless an admin overrides.
from datetime import datetime, timedelta
def within_attendance_window(date_str: str) -> bool:
d = datetime.fromisoformat(date_str).date()
today = datetime.utcnow().date()
return abs((today - d).days) <= 1
if not within_attendance_window(payload['date']):
raise HTTPException(400, 'Attendance can only be marked within 1 day of class')
-- Prevent two subjects assigned to same semester/day/hour
SELECT COUNT(*) FROM timetable_slots
WHERE semester_id = :semester_id AND day_of_week = :day AND slot_order = :hour;
-- if count > 0 then conflict exists
@app.post('/api/attendance/mark')
async def mark_attendance(sessionId: str, studentId: int, status: str, db: Database = Depends(get_db), user=Depends(get_current_user)):
# fetch session metadata
ses = await db.fetch_one('SELECT * FROM attendance_sessions WHERE id = :sid', values={'sid': sessionId})
if not ses:
raise HTTPException(404, 'Session not found')
# verify staff
if not await staff_assigned(db, user.staff_id, ses['subject_id']):
raise HTTPException(403, 'Not authorized')
# verify student in semester
if not await student_in_semester(db, studentId, ses['semester_id']):
raise HTTPException(400, 'Student not in semester')
# prevent duplicates
exists = await db.fetch_one('SELECT 1 FROM attendance WHERE subject_id=:sub AND student_id=:stu AND date=:d AND hour=:h', values={'sub': ses['subject_id'],'stu': studentId,'d': ses['date'],'h': ses['hour']})
if exists:
raise HTTPException(409, 'Attendance already marked')
# insert
await db.execute('INSERT INTO attendance(subject_id, staff_id, date, hour, student_id, status) VALUES(:sub, :staff, :d, :h, :stu, :status)', values={'sub': ses['subject_id'],'staff': user.staff_id,'d': ses['date'],'h': ses['hour'],'stu': studentId,'status': status})
return { 'status':'ok' }
Create SQL queries and FastAPI endpoints to produce daily, date-range, monthly and semester reports for attendance.
-- Daily attendance summary per subject
SELECT s.subject_id, sub.code AS subject_code, sub.name AS subject_name,
COUNT(*) FILTER (WHERE a.status='present') AS present_count,
COUNT(*) FILTER (WHERE a.status='absent') AS absent_count,
COUNT(*) AS total_taken
FROM attendance a
JOIN subjects sub ON sub.id = a.subject_id
JOIN semesters s ON s.id = sub.semester_id
WHERE a.date = '2025-08-10'
GROUP BY s.subject_id, sub.code, sub.name;
-- Attendance percent per student in a date range
SELECT st.student_code, st.name,
(COUNT(*) FILTER (WHERE a.status='present')::float / NULLIF(COUNT(*),0) * 100) AS attendance_pct
FROM attendance a
JOIN students st ON st.id = a.student_id
WHERE a.date BETWEEN '2025-08-01' AND '2025-08-31' AND st.semester = 3
GROUP BY st.id, st.student_code, st.name
ORDER BY attendance_pct DESC;
# file: app/reports.py
from fastapi import APIRouter
from databases import Database
from fastapi import Depends
router = APIRouter()
@router.get('/reports/monthly')
async def monthly_report(year: int, month: int, semester: int, db: Database = Depends(get_db)):
start = f"{year}-{month:02d}-01"
# naive end-of-month, use calendar for real app
end = f"{year}-{month:02d}-31"
query = """
SELECT st.student_code, st.name,
(COUNT(*) FILTER (WHERE a.status='present')::float / NULLIF(COUNT(*),0) * 100) AS attendance_pct
FROM attendance a
JOIN students st ON st.id = a.student_id
WHERE a.date BETWEEN :start AND :end AND st.semester = :semester
GROUP BY st.id, st.student_code, st.name
ORDER BY attendance_pct DESC;
"""
rows = await db.fetch_all(query, values={"start": start, "end": end, "semester": semester})
return rows
-- Average attendance % across all students in a semester
SELECT st.course_id, st.semester,
AVG(pct) AS avg_attendance_pct
FROM (
SELECT student_id, (COUNT(*) FILTER (WHERE status='present')::float / NULLIF(COUNT(*),0) * 100) AS pct
FROM attendance
GROUP BY student_id
) sub
JOIN students st ON st.id = sub.student_id
WHERE st.semester = 3
GROUP BY st.course_id, st.semester;
# file: app/export.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import csv, io
router = APIRouter()
@router.get('/reports/export/monthly')
async def export_monthly_csv(year: int, month: int, semester: int, db: Database = Depends(get_db)):
rows = await monthly_query(db, year, month, semester)
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(['student_code','name','attendance_pct'])
for r in rows:
writer.writerow([r['student_code'], r['name'], r['attendance_pct']])
buf.seek(0)
return StreamingResponse(iter([buf.read()]), media_type='text/csv')
calendar.monthrange or SQL date functions.Build a simple web UI to mark attendance for 60-student classes and a bulk API endpoint to accept batch updates.
Copy this file as attendance.html
<!doctype html>
<html><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/><title>Attendance</title></head><body>
<h2>Mark Attendance</h2>
<div>
<label>SubjectId:<input id="subject" /></label>
<label>Date:<input id="date" type="date" /></label>
<label>Hour:<input id="hour" type="number" min="1" max="8" /></label>
<button onclick="loadStudents()">Load Students</button>
</div>
<div id="students"></div>
<script>
async function loadStudents(){
const subject = document.getElementById('subject').value;
// call API to fetch students in this semester
const res = await fetch(`/api/students?subjectId=${subject}`);
const students = await res.json();
const container = document.getElementById('students');
container.innerHTML = '';
students.forEach(s => {
const row = document.createElement('div');
row.innerHTML = ` ${s.name} (${s.student_code})`;
container.appendChild(row);
});
}
async function submitBulk(){
const subject = document.getElementById('subject').value;
const date = document.getElementById('date').value;
const hour = parseInt(document.getElementById('hour').value||'1');
const checks = Array.from(document.querySelectorAll('#students input[type=checkbox]'));
const attendance = checks.map(c => ({ studentId: c.getAttribute('data-id'), status: c.checked? 'present' : 'absent'}));
await fetch('/api/attendance/bulk',{ method:'POST', headers:{'Content-Type':'application/json'}, body: JSON.stringify({ subjectId: subject, date, hour, attendance }) });
alert('Submitted');
}
</script>
<button onclick="submitBulk()">Submit Attendance</button>
</body></html>
# file: app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import databases
DATABASE_URL = "postgresql://postgres:password@localhost/mcp_db"
db = databases.Database(DATABASE_URL)
app = FastAPI()
class AttRow(BaseModel):
studentId: int
status: str
class BulkIn(BaseModel):
subjectId: int
date: str
hour: int
attendance: List[AttRow]
@app.post('/api/attendance/bulk')
async def bulk_att(payload: BulkIn):
# Basic validation
if not payload.attendance:
raise HTTPException(400,'No attendance rows')
query = "INSERT INTO attendance(subject_id, staff_id, date, hour, student_id, status) VALUES(:subject_id, :staff_id, :date, :hour, :student_id, :status)"
# staff_id will be set based on session - simplified here as NULL
async with db.transaction():
for r in payload.attendance:
await db.execute(query, values={
'subject_id': payload.subjectId,
'staff_id': None,
'date': payload.date,
'hour': payload.hour,
'student_id': r.studentId,
'status': r.status
})
return { 'status': 'success', 'stored': len(payload.attendance) }
MCP Tutorial – Module Index MCP Tutorial – 12 Modules Latest Edition for AU-CSE Final year Students Module 1 Introducti...