Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.appliedaifoundation.org/llms.txt

Use this file to discover all available pages before exploring further.

Four stages in sequence. Each is a Python module with a clear input/output contract.

Fetcher

File: src/fetcher.py Purpose: Pull unread Metaweave Forms: emails from a shared Outlook mailbox via Microsoft Graph.

Auth

OAuth2 client credentials flow via MSAL. The app needs Mail.Read + Mail.ReadWrite application permissions on the target mailbox (consented by an Azure AD admin).
get_access_token() →
  MSAL ConfidentialClientApplication(
    client_id=AZURE_CLIENT_ID,
    client_credential=AZURE_CLIENT_SECRET,
    authority=f"https://login.microsoftonline.com/{AZURE_TENANT_ID}"
  ).acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])

Query

Filters emails server-side via OData:
GET https://graph.microsoft.com/v1.0/users/{OUTLOOK_USER_EMAIL}/messages
  ?$filter=isRead eq false and contains(subject, 'Metaweave Forms')
  &$select=id,subject,body,receivedDateTime
  &$orderby=receivedDateTime asc
  &$top=50
$top=50 per call — extend with pagination if your fleet exceeds 50 unread/run.

Subject parsing

Each subject is parsed against a strict regex (config.py):
SUBJECT_PATTERN = re.compile(
    r"Metaweave Forms:\s*(.+?)\s*-\s*(.+?)\s*-\s*(\d{2}\.\d{2}\.\d{4})"
)
Captures (vessel_name, report_type_raw, report_date_DD.MM.YYYY). Subjects that don’t match are skipped — this is the gate that excludes stray emails from the same mailbox.

Body extraction

If the body’s contentType is html, fetcher strips HTML tags before passing to the parser. This handles forwarded plain-text wrapped in HTML by Outlook.

Mark as read

PATCH /messages/{id} with {"isRead": true} after successful processing. Failed messages stay unread for retry on the next run.

Output

A list of FetchedEmail dataclasses:
@dataclass
class FetchedEmail:
    message_id: str
    subject: str
    body_text: str
    received_datetime: datetime
    vessel_name: str
    report_type_raw: str   # "Noon Report", "Arrival Notice", etc.
    report_date: date

Parser

File: src/parser.py Purpose: Extract the encrypted payload from the email body, decrypt it to a JSON dict.

Header extraction

Pulls report_type_raw and form_version from a header line in the body:
re.search(r"Report:\s*(.+?)\s+(v[\d.]+)", body)
# ("Noon Report", "v00.01.02")

Marker extraction

Locates the encrypted block:
---------- BEGIN MW FORM DATA ---------------
<base64 ciphertext>
------------- END MW FORM DATA ----------------
Markers are configured in config.py:
MARKER_BEGIN = "BEGIN MW FORM DATA"
MARKER_END = "END MW FORM DATA"

Decryption

def decrypt_payload(b64_ciphertext: str, key: str) -> dict:
    raw = base64.b64decode(b64_ciphertext)
    cipher = AES.new(key.encode(), AES.MODE_CBC, key.encode())  # IV = key (16 bytes)
    padded = cipher.decrypt(raw)
    plain = unpad(padded, AES.block_size)
    return json.loads(plain.decode())
  • AES-128-CBC (16-byte key)
  • IV = key (matching the form’s CryptoJS encryption)
  • PKCS7 padding
  • Library: pycryptodome

Text fallback

If markers are missing (e.g. crew hand-edited and broke the block), parser falls back to a regex-based key-value extractor that reads section headers (---Section---) and key: value lines, flattening to a dict with keys like "Section::Key". Coverage is partial — most fields will arrive but rich nested arrays won’t.

Output

@dataclass
class ParseResult:
    form_data: dict        # the decrypted (or fallback) payload
    report_type_raw: str   # from the header line
    form_version: str

Mapper

File: src/mapper.py Purpose: Translate the form’s JSON payload into SQLAlchemy model instances ready for the writer.

What it produces

{
    "vessel_info":        {"imo": ..., "name": ..., "code": ...},
    "voyage_number":      "V31",
    "report":             Report(...),                # 92 scalar fields
    "events":             [ReportEvent(...), ...],    # at-sea + in-port, with nested fuel breakdown
    "bunker_rob":         [ReportBunkerRob(...), ...],# per fuel type, per context
    "upcoming_ports":     [ReportUpcomingPort(...), ...],
    "fowe_periods":       [ReportFowePeriod(...), ...],
    "scrubber_breakdowns":[ReportScrubberBreakdown(...), ...],
    "bunker_deliveries":  [BunkerDelivery(...), ...],
    "bunker_biofuels":    [BunkerBiofuel(...), ...],
    "sof_activities":     [SofActivity(...), ...],
    "cargo_details":      [ReportCargo(...), ...],
    "month_end_bunker":   [MonthEndBunkerReport(...), ...],
    "berthing":           [BerthingDetails(...), ...],
}

Key transformations

From formTo DBHelper
"6 1' 54\" N"6.0317 (decimal degrees)dms_to_decimal()
"13.04.2026 12:00:00 +03:00"datetime(2026, 4, 13, 9, 0, tzinfo=UTC)parse_report_datetime()
"Yes" / "No" / "True" / "1"True / Falseparse_bool()
"123.45" (string)Decimal("123.45")safe_decimal() (returns None on failure)
"42" (string)42safe_int()
All helpers in src/utils/datetime_utils.py and src/utils/coordinates.py return None on failure rather than raising — bad data becomes NULL, the run continues.

Per-event nested fuel

Each event has a nested fuel array with 12 consumption categories per fuel type:
propulsion · maneuver · generator · loaddischarge · deballast · igs ·
boiler · incinerator · cargoheating · tankcleaning · others · flushing
Plus subtotals: main_engine_consumption, aux_engine_consumption, total_consumption. These map to EventFuelConsumption rows attached to each ReportEvent.

Context routing

The mapper reads two array names depending on location (At Sea / In Port):
LocationArray names
At Seaatseaeventrobdetails, atseabunkerrobdetails, gsatseaeventtypes
In Portinporteventrobdetails, inportbunkerrobdetails, gsinporteventtypes
Both flow into the same DB tables; a context column tags each row.

Writer

File: src/writer.py Purpose: Upsert into PostgreSQL.

Upsert sequence

def write_report(session, mapped, email_message_id):
    vessel = upsert_vessel(session, mapped["vessel_info"])             # by imo_number
    voyage = upsert_voyage(session, vessel.vessel_id, mapped["voyage_number"])
    
    # Replacement: same (vessel, type, datetime) → delete old, insert new
    existing = session.query(Report).filter_by(
        vessel_id=vessel.vessel_id,
        report_type=report.report_type,
        report_datetime_utc=report.report_datetime_utc,
    ).one_or_none()
    if existing:
        session.delete(existing)   # CASCADE drops 11 child arrays
        session.flush()
    
    report = mapped["report"]
    report.vessel_id = vessel.vessel_id
    report.voyage_id = voyage.voyage_id
    report.email_message_id = email_message_id
    report.received_at = datetime.utcnow()
    session.add(report)
    session.flush()                # get report_id
    
    # Attach all 11 child arrays with report_id FK
    for events_list_name, items in mapped.items():
        if events_list_name in CHILD_KEYS:
            for item in items:
                item.report_id = report.report_id
                session.add(item)
    
    return report

CASCADE delete

All 11 child tables declare:
ForeignKey("metaweave_report.report_id", ondelete="CASCADE")
So DELETE FROM metaweave_report WHERE report_id=… removes the entire family in one statement. This is what makes corrections clean.

Why delete-then-insert (not UPDATE)

Reports have variable-length child arrays. A single UPDATE would have to diff: which events to add, update, delete? Delete-then-insert is simpler and faster for typical sizes (~5 events, ~4 bunker ROBs per report). Atomicity is preserved by the surrounding transaction — session.commit() covers both the delete and the insert.

See also