Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.appliedaifoundation.org/llms.txt

Use this file to discover all available pages before exploring further.

The pipeline has a single entry point: python -m src.main. Three modes, controlled by flags.

Modes

Default — fetch from Outlook

python -m src.main
  • Connects to Microsoft Graph via MSAL client credentials
  • Fetches unread emails matching Metaweave Forms: in the subject
  • Processes each one: Parse → Map → Write
  • Marks each email as read after successful processing
  • Commits the session per email; rolls back on error
  • Logs Done: X/N processed successfully at the end
This is what you’d schedule (cron, Cloud Scheduler, etc.).

Single-file — process a saved email

python -m src.main --file /path/to/email.txt
  • Skips the Outlook fetch entirely
  • Reads the body from a local file
  • Runs Parse → Map → Write
  • Useful for: testing, replay, debugging a specific submission
The file should contain the full email body — preferably with the BEGIN MW FORM DATA / END MW FORM DATA markers so the parser hits the encrypted path. Without markers, it falls back to text parsing (limited).

Schema setup — create tables

python -m src.main --create-tables
  • Calls Base.metadata.create_all(engine) on the configured database
  • Creates the 17 tables if missing; no-ops if they exist
  • Exits immediately — does not process any emails
Run this once per database. Idempotent.

Scheduled invocation

A typical cron entry (every 10 minutes):
*/10 * * * * cd /opt/metaweave-pipeline && /opt/metaweave-pipeline/.venv/bin/python -m src.main >> /var/log/metaweave-pipeline.log 2>&1
For Cloud Scheduler / Lambda / Cloud Functions, the same idea — invoke the entry point on the cadence you want.

Log output

The pipeline logs at INFO level with timestamps. A successful run looks like:
2026-04-13 12:35:01,234 INFO     Fetching unread Metaweave Forms emails…
2026-04-13 12:35:02,567 INFO     Found 3 unread email(s)
2026-04-13 12:35:02,890 INFO     Processing AAMkAGI… - 'Metaweave Forms: MT ABC - Noon Report - 13.04.2026'
2026-04-13 12:35:03,123 INFO     Stored: MT ABC NOON 2026-04-13 12:00:00+00:00 (report_id=1234, events=2, bunker_rob=4)
2026-04-13 12:35:03,456 INFO     Marked AAMkAGI… as read
2026-04-13 12:35:03,789 INFO     Processing AAMkAGJ… - 'Metaweave Forms: MV XYZ - Departure Report - 13.04.2026'
2026-04-13 12:35:04,012 INFO     Stored: MV XYZ DEPARTURE 2026-04-13 08:30:00+00:00 (report_id=1235, events=1, bunker_rob=4)

2026-04-13 12:35:05,789 INFO     Done: 3/3 processed successfully
A failing email is logged but doesn’t stop the run:
2026-04-13 12:35:04,500 ERROR    Failed to process AAMkAGK…: Could not decrypt payload: Invalid padding bytes
2026-04-13 12:35:05,789 INFO     Done: 2/3 processed successfully
Failed emails are not marked as read by default — they’ll be re-attempted on the next run. Investigate by reading the body and replaying with --file.

What happens per email

1. fetcher: GET /messages?$filter=isRead eq false and contains(subject, 'Metaweave Forms')
   → returns FetchedEmail(message_id, subject, body_text, received_datetime, vessel_name, report_type_raw, report_date)

2. parser.parse_email_body(body_text)
   → extracts BEGIN/END markers
   → decrypts AES-128-CBC payload
   → returns ParseResult(form_data: dict, report_type_raw, form_version)

3. mapper.map_report(parse_result)
   → coerces 92 scalar fields, builds 11 child arrays
   → returns dict with vessel_info, voyage_number, Report instance, child lists

4. writer.write_report(session, mapped, email_message_id)
   → upsert Vessel by IMO
   → upsert Voyage by (vessel_id, voyage_number)
   → delete existing Report by (vessel_id, report_type, report_datetime_utc) — CASCADE drops children
   → insert new Report + 11 child arrays
   → flush to get report_id

5. session.commit()  # per email; rollback on exception

6. fetcher: PATCH /messages/{id}  isRead=true

Idempotency and replay

Re-running the same email is safe because:
  • The Vessel upsert keys on IMO (won’t duplicate)
  • The Voyage upsert keys on (vessel_id, voyage_number)
  • The Report uses delete-then-insert on (vessel_id, report_type, report_datetime_utc)
So the same email always produces the same final database state. This is also how corrections work — re-submit a corrected Noon and it overwrites cleanly. Mark-as-read is the only side effect on re-run — once marked, the default-mode fetcher skips it. To re-process, either:
  • Mark unread in Outlook manually
  • Save the body to a file and use --file

Failure modes

FailureWhat you’ll seeWhat to do
Bad AES paddingERROR Failed to process … Could not decrypt payloadSubmit was tampered with or used wrong key. Check the email body.
Subject regex mismatchEmail skipped silently (filter excludes it)Crew didn’t follow subject convention. Ask them to resubmit with the correct format.
DB connection refusedERROR could not connect to serverCheck CLOUD_SQL_INSTANCE_CONNECTION_NAME and service account permissions.
Outlook auth failsERROR AcquireTokenForClient … invalid_clientRotate AZURE_CLIENT_SECRET, check tenant ID.
Unique constraint violationIntegrityError on insertTwo emails for the same (vessel, type, datetime) processed in the same second. Re-run — second will replace first.

See also