Documentation Index
Fetch the complete documentation index at: https://docs.appliedaifoundation.org/llms.txt
Use this file to discover all available pages before exploring further.
The pipeline has a single entry point: python -m src.main. Three modes, controlled by flags.
Modes
Default — fetch from Outlook
- Connects to Microsoft Graph via MSAL client credentials
- Fetches unread emails matching
Metaweave Forms: in the subject
- Processes each one: Parse → Map → Write
- Marks each email as read after successful processing
- Commits the session per email; rolls back on error
- Logs
Done: X/N processed successfully at the end
This is what you’d schedule (cron, Cloud Scheduler, etc.).
Single-file — process a saved email
python -m src.main --file /path/to/email.txt
- Skips the Outlook fetch entirely
- Reads the body from a local file
- Runs Parse → Map → Write
- Useful for: testing, replay, debugging a specific submission
The file should contain the full email body — preferably with the BEGIN MW FORM DATA / END MW FORM DATA markers so the parser hits the encrypted path. Without markers, it falls back to text parsing (limited).
Schema setup — create tables
python -m src.main --create-tables
- Calls
Base.metadata.create_all(engine) on the configured database
- Creates the 17 tables if missing; no-ops if they exist
- Exits immediately — does not process any emails
Run this once per database. Idempotent.
Scheduled invocation
A typical cron entry (every 10 minutes):
*/10 * * * * cd /opt/metaweave-pipeline && /opt/metaweave-pipeline/.venv/bin/python -m src.main >> /var/log/metaweave-pipeline.log 2>&1
For Cloud Scheduler / Lambda / Cloud Functions, the same idea — invoke the entry point on the cadence you want.
Log output
The pipeline logs at INFO level with timestamps. A successful run looks like:
2026-04-13 12:35:01,234 INFO Fetching unread Metaweave Forms emails…
2026-04-13 12:35:02,567 INFO Found 3 unread email(s)
2026-04-13 12:35:02,890 INFO Processing AAMkAGI… - 'Metaweave Forms: MT ABC - Noon Report - 13.04.2026'
2026-04-13 12:35:03,123 INFO Stored: MT ABC NOON 2026-04-13 12:00:00+00:00 (report_id=1234, events=2, bunker_rob=4)
2026-04-13 12:35:03,456 INFO Marked AAMkAGI… as read
2026-04-13 12:35:03,789 INFO Processing AAMkAGJ… - 'Metaweave Forms: MV XYZ - Departure Report - 13.04.2026'
2026-04-13 12:35:04,012 INFO Stored: MV XYZ DEPARTURE 2026-04-13 08:30:00+00:00 (report_id=1235, events=1, bunker_rob=4)
…
2026-04-13 12:35:05,789 INFO Done: 3/3 processed successfully
A failing email is logged but doesn’t stop the run:
2026-04-13 12:35:04,500 ERROR Failed to process AAMkAGK…: Could not decrypt payload: Invalid padding bytes
2026-04-13 12:35:05,789 INFO Done: 2/3 processed successfully
Failed emails are not marked as read by default — they’ll be re-attempted on the next run. Investigate by reading the body and replaying with --file.
What happens per email
1. fetcher: GET /messages?$filter=isRead eq false and contains(subject, 'Metaweave Forms')
→ returns FetchedEmail(message_id, subject, body_text, received_datetime, vessel_name, report_type_raw, report_date)
2. parser.parse_email_body(body_text)
→ extracts BEGIN/END markers
→ decrypts AES-128-CBC payload
→ returns ParseResult(form_data: dict, report_type_raw, form_version)
3. mapper.map_report(parse_result)
→ coerces 92 scalar fields, builds 11 child arrays
→ returns dict with vessel_info, voyage_number, Report instance, child lists
4. writer.write_report(session, mapped, email_message_id)
→ upsert Vessel by IMO
→ upsert Voyage by (vessel_id, voyage_number)
→ delete existing Report by (vessel_id, report_type, report_datetime_utc) — CASCADE drops children
→ insert new Report + 11 child arrays
→ flush to get report_id
5. session.commit() # per email; rollback on exception
6. fetcher: PATCH /messages/{id} isRead=true
Idempotency and replay
Re-running the same email is safe because:
- The Vessel upsert keys on IMO (won’t duplicate)
- The Voyage upsert keys on (vessel_id, voyage_number)
- The Report uses delete-then-insert on (vessel_id, report_type, report_datetime_utc)
So the same email always produces the same final database state. This is also how corrections work — re-submit a corrected Noon and it overwrites cleanly.
Mark-as-read is the only side effect on re-run — once marked, the default-mode fetcher skips it. To re-process, either:
- Mark unread in Outlook manually
- Save the body to a file and use
--file
Failure modes
| Failure | What you’ll see | What to do |
|---|
| Bad AES padding | ERROR Failed to process … Could not decrypt payload | Submit was tampered with or used wrong key. Check the email body. |
| Subject regex mismatch | Email skipped silently (filter excludes it) | Crew didn’t follow subject convention. Ask them to resubmit with the correct format. |
| DB connection refused | ERROR could not connect to server | Check CLOUD_SQL_INSTANCE_CONNECTION_NAME and service account permissions. |
| Outlook auth fails | ERROR AcquireTokenForClient … invalid_client | Rotate AZURE_CLIENT_SECRET, check tenant ID. |
| Unique constraint violation | IntegrityError on insert | Two emails for the same (vessel, type, datetime) processed in the same second. Re-run — second will replace first. |
See also