Browse documents

Phase 3 — Data Platform

Status: Shipped ✅ · Owner: Data Platform Lead · Duration: 4 weeks · Gate: G3

1. Overview

Phase 3 builds the data backbone of Atlas: the canonical Asset / Location / System / DataPoint / Connection model, the connector framework that ingests from BACnet, MQTT, REST, Webhooks, CSV/XLSX upload, and S3 file drops; the time-series store for telemetry; the data-quality engine that scores every ingest; and the ontology / tagging layer that lets every downstream feature (Dashboards, AI, Workflow) query data semantically rather than by raw field name.

This is the phase that makes everything after it actually work.

2. Objectives

  • O3.1 — Canonical model (Asset, Location, System, DataPoint, Connection, Gateway, Site, Building) live in MongoDB with full CRUD via API.
  • O3.2 — Connector framework with five working adapters: BACnet/IP (via edge gateway), MQTT/MQTTS (broker subscribe), REST inbound, Webhook inbound (HMAC), File upload (CSV/XLSX/JSON).
  • O3.3 — Time-series store for telemetry, ingestion >5,000 events/sec/tenant sustained, query p95 < 300 ms for typical dashboard ranges.
  • O3.4 — Data-quality scoring on every batch with a visible scorecard per source.
  • O3.5 — Tagging / ontology layer (Brick-Schema-compatible) with admin UI.
  • O3.6 — Bulk import workflows for Asset Ledger (Excel) and Work-Order history (CSV) — proven with the KTC sample data.
  • O3.7 — Real-time channel (SSE) for live telemetry to dashboards and workflows.

3. Scope

3.1 In-scope

  • Mongo collections per the data model in Phase_1.md §5.7.
  • Time-series collection for telemetry with appropriate sharding key.
  • Connector framework: pluggable adapter pattern (Connector interface), config-driven, lifecycle hooks.
  • Edge-gateway companion: a small Node.js agent (separate package) that runs on Advantech ECU-1051 (or equivalent), translates BACnet to MQTTS, manages local buffer.
  • MQTT broker selection + integration (EMQX or Mosquitto; ADR-015).
  • Ingest endpoint /api/v1/ingest/* (REST + Webhook + File).
  • Data-quality engine (completeness, accuracy, freshness, conformity, uniqueness, validity).
  • Tagging/ontology UI + API.
  • Bulk import wizards (Excel → Asset; CSV → Work Order; JSON → DataPoint mapping).
  • Reconciliation report (Asset Ledger ↔ BMS device list ↔ ceiling-plan placement).
  • SSE channels for telemetry subscription.

3.2 Out-of-scope

  • Dashboards (Phase 5) — telemetry will be testable via API + simple devtools UI.
  • AI auto-categorisation (Phase 6) — manual categorisation only.
  • Workflow triggers (Phase 7) — telemetry events publish to a topic; Phase 7 subscribes.
  • Mobile capture forms (Phase 8).

4. Dependencies

  • Phase 2 complete (auth, RBAC, OpenAPI, audit, observability).

5. Architecture & Design

5.1 Ingest topology

                                ┌─────────────────────┐
   ┌──────────┐   BACnet/IP     │   Edge Gateway      │   MQTTS
   │   BMS    │ ──────────────▶ │  (Advantech/...)    │ ───────┐
   └──────────┘                 │  + buffer + map     │        │
                                └─────────────────────┘        ▼
   ┌──────────┐    MQTT/MQTTS                          ┌───────────────┐
   │ IoT / 3P │ ───────────────────────────────────────▶│ MQTT Broker   │
   └──────────┘                                         │ (EMQX)        │
                                                         └──────┬────────┘
                                                                ▼
                                                       ┌────────────────────┐
   ┌──────────┐   POST /ingest/rest          ┌────────▶│  Ingest Service    │
   │ 3P API   │ ─────────────────────────────┘        │  - validate (Zod)  │
   └──────────┘                                       │  - DQ score        │
                                                       │  - tag normalise   │
   ┌──────────┐   POST /ingest/webhook (HMAC)         │  - persist         │
   │ Webhook  │ ──────────────────────────────────────▶│  - publish event   │
   └──────────┘                                       └──────┬─────────────┘
                                                              │
   ┌──────────┐   POST /ingest/file (multipart)               │
   │ File ↑   │ ─────────────────────────────────────────────▶│
   └──────────┘                                                ▼
                                                       ┌──────────────────┐
                                                       │  MongoDB         │
                                                       │  · telemetry TS  │
                                                       │  · entities      │
                                                       │  · auditLog      │
                                                       └──────┬───────────┘
                                                              │
                                                              ▼
                                                       ┌──────────────────┐
                                                       │ SSE / Pub-Sub    │
                                                       │ to downstream    │
                                                       └──────────────────┘

5.2 Connector interface

export interface Connector<TConfig = unknown> {
  kind: 'bacnet' | 'mqtt' | 'rest' | 'webhook' | 'file' | 'modbus' | 'opc';
  validateConfig(cfg: unknown): TConfig;
  start(ctx: ConnectorCtx, cfg: TConfig): Promise<ConnectorHandle>;
  health(handle: ConnectorHandle): Promise<HealthStatus>;
  stop(handle: ConnectorHandle): Promise<void>;
}

export interface ConnectorCtx {
  tenantId: string;
  gatewayId?: string;
  emit: (batch: IngestBatch) => Promise<void>;
  log: Logger;
}

Each connector lives in packages/connectors/<kind> and is registered in a ConnectorRegistry at boot.

5.3 Ingest pipeline

Every batch flows through:

  1. Authenticate (API key, HMAC, or gateway cert).
  2. Validate (Zod schema for the batch shape).
  3. Normalise (apply unit conversion, timestamp normalisation, timezone).
  4. Tag (apply mapping rules to attach Brick concepts).
  5. DQ score (per-field rules; produces row-level and batch-level score).
  6. Persist (write to time-series for telemetry, to typed collection for entities).
  7. Audit (one entry per batch).
  8. Publish (events to event.bus topic for SSE + Workflow + AI subscribers).

5.4 Time-series storage

  • MongoDB Time Series Collection named telemetry.
  • timeField: 't', metaField: 'meta', granularity: 'minutes'.
  • meta carries {tenantId, dataPointId, assetId, tags[]}.
  • Indexes on meta.tenantId + meta.dataPointId + t (time-bucketed).
  • Aggregations: pre-roll 1m / 5m / 1h / 1d via background job for fast dashboard queries.

5.5 Ontology / tagging

  • Base ontology: Brick Schema 1.4 (Class hierarchy: Equipment, Point, Location, etc.).
  • Custom extensions per tenant (additive only).
  • Tag CRUD with admin UI, search, hierarchy view.
  • Auto-suggestion: when a new DataPoint name matches a known pattern (e.g., *_temp_sensor), suggest tag [Sensor, Temperature_Sensor].

5.6 Bulk import — wizards

  • Asset Ledger (Excel): detect sheet, map columns (with persistent mapping templates), preview top 50 rows, dry-run validation, commit.
  • Work Order (CSV, with encoding detection — UTF-8 and UTF-16): same pattern; supports the KTC sample.
  • DataPoint (CSV/JSON): BACnet mapping table → DataPoints + Connections + Assets.

Every import:

  • Persists the source file in S3.
  • Writes an importJob document with status, mapping, counts, errors.
  • Produces a downloadable report (CSV) for review.

5.7 Reconciliation engine

  • Compares Asset Ledger ↔ BMS device list ↔ ceiling-plan placement.
  • Surface deltas: orphan in ledger, orphan in BMS, conflicting attributes, location mismatch.
  • Admin can resolve each delta (merge, reassign, ignore) — each resolution is audited.

5.8 Edge-gateway agent

  • Small Node.js binary deployable on Advantech ECU-1051 (Linux ARM/x86) or any host with Node ≥20.
  • Reads local config (gateway.yaml), authenticates to Atlas with a per-gateway certificate.
  • BACnet/IP client (using node-bacnet or bacstack); subscribes to configured points.
  • Local buffer (SQLite) for store-and-forward during disconnects.
  • MQTTS uplink with QoS 1.
  • Health beacon every 30s to Atlas.

6. Detailed Specifications

6.1 API surface (Phase 3 additions)

# Entities
GET    /api/v1/sites
POST   /api/v1/sites
…  same CRUD for /buildings, /locations, /systems, /assets, /data-points, /connections, /gateways

# Ingestion
POST   /api/v1/ingest/rest                 (tenant API key)
POST   /api/v1/ingest/webhook/:id          (HMAC)
POST   /api/v1/ingest/file                 (multipart, asset/workorder/datapoint mapping)
GET    /api/v1/ingest/jobs
GET    /api/v1/ingest/jobs/:id

# Telemetry
GET    /api/v1/telemetry
        ?dataPointIds=…&from=…&to=…&agg=raw|1m|5m|1h|1d
GET    /api/v1/telemetry/stream            (SSE, query-bound)

# Tagging
GET    /api/v1/tags
POST   /api/v1/tags
PATCH  /api/v1/tags/:id
DELETE /api/v1/tags/:id

# Reconciliation
POST   /api/v1/recon/run
GET    /api/v1/recon/reports/:id
POST   /api/v1/recon/reports/:id/resolve

6.2 Data-quality rules (default set)

RuleDescription
CompletenessRequired fields present per schema.
AccuracyValues within configured bounds (per DataPoint).
FreshnessLast value within freshness SLA.
ConformityUnits match expected; enum values valid.
UniquenessPrimary identifiers unique within tenant.
ValidityType validation; reference targets exist.

Each rule produces a 0–1 score per row; weighted batch score persisted on the import / batch document.

6.3 Permissions added

site.read site.create site.update site.delete
building.* location.* system.* asset.* datapoint.* connection.* gateway.*
ingest.write ingest.read
telemetry.read telemetry.stream
tag.read tag.create tag.update tag.delete
recon.run recon.resolve

7. Implementation Tasks

Epic 3.A — Models & repositories

  • 3.A.1 Mongoose models per §5.7 of Phase 1.
  • 3.A.2 Time-series collection setup script.
  • 3.A.3 Repositories for every entity, with tenant guard + Zod validation.
  • 3.A.4 Indexes per query pattern.

Epic 3.B — Connector framework

  • 3.B.1 Connector interface, registry, lifecycle.
  • 3.B.2 BACnet/IP adapter (with edge agent companion).
  • 3.B.3 MQTT/MQTTS adapter (broker subscribe).
  • 3.B.4 REST inbound adapter (API-key auth).
  • 3.B.5 Webhook adapter (HMAC, replay window).
  • 3.B.6 File adapter (CSV/XLSX/JSON) + mapping wizard.
  • 3.B.7 Modbus + OPC-UA adapters (stretch; gated by demand).

Epic 3.C — Ingest pipeline

  • 3.C.1 Pipeline orchestrator (validate → normalise → tag → DQ → persist → publish).
  • 3.C.2 DQ engine with default rule set.
  • 3.C.3 Audit + observability hooks.

Epic 3.D — Telemetry

  • 3.D.1 Time-series writes.
  • 3.D.2 Query API with aggregation buckets.
  • 3.D.3 Pre-roll background jobs.
  • 3.D.4 SSE stream endpoint.

Epic 3.E — Tagging & ontology

  • 3.E.1 Brick Schema seed.
  • 3.E.2 Tag CRUD + UI.
  • 3.E.3 Auto-suggester (pattern → tags).

Epic 3.F — Bulk import + reconciliation

  • 3.F.1 Excel import wizard (with KTC Asset Ledger as fixture).
  • 3.F.2 CSV import wizard with encoding detection (UTF-16 supported — KTC fixture).
  • 3.F.3 DataPoint mapping import.
  • 3.F.4 Reconciliation engine + report UI.

Epic 3.G — Edge gateway agent

  • 3.G.1 Node.js binary scaffold; config file; cert-based auth.
  • 3.G.2 BACnet client; subscribe to configured points.
  • 3.G.3 Local SQLite buffer; store-and-forward.
  • 3.G.4 MQTTS uplink with QoS 1.
  • 3.G.5 Health beacon + remote config refresh.
  • 3.G.6 Installer / systemd unit / Docker image for the gateway.

8. Acceptance Criteria

  • AC3.1 — A new tenant can create a Gateway record, generate gateway credentials, install the edge agent, and within 10 minutes see live BACnet points flowing into the telemetry time-series.
  • AC3.2 — POST /api/v1/ingest/rest accepts a 1k-event batch in <500 ms p95 and persists it with a DQ score.
  • AC3.3 — The KTC Asset Ledger (8,180 rows) imports successfully via the Excel wizard with a reconciliation report against the BMS Equipment List (953 rows).
  • AC3.4 — The KTC 2024 work-order CSV (UTF-16) imports successfully via the CSV wizard.
  • AC3.5 — Telemetry query for "last 24h of DataPoint X at 5m aggregation" returns in <300 ms p95.
  • AC3.6 — SSE stream for a tenant's telemetry delivers events within 2 seconds p95 of ingest.
  • AC3.7 — Tagging allows attaching Brick concepts to DataPoints; query "all Temperature_Sensor in Building Y" returns expected list.
  • AC3.8 — Reconciliation report identifies known KTC mismatches (orphans, duplicates) and allows resolution with audit trail.

9. Test Requirements

  • Unit: ≥80% coverage on repos + pipeline.
  • Integration: every connector with simulated source (BACnet emulator, MQTT broker, local server).
  • Load: 5k events/sec sustained for 10 minutes; p95 ingest persistence <500 ms.
  • e2e: Excel import wizard, CSV import wizard, reconciliation flow.
  • Contract: ingest endpoints validated against OpenAPI in tests.
  • Fault: simulate disconnect/buffer/replay on edge agent.

10. Documentation Requirements

  • docs/connectors/bacnet.md, mqtt.md, rest.md, webhook.md, file.md, modbus.md, opcua.md.
  • docs/data/ontology.md (Brick Schema usage).
  • docs/data/import_wizards.md.
  • docs/data/reconciliation.md.
  • docs/runbooks/gateway_install.md.
  • docs/observability/telemetry_dashboards.md (initial Grafana panels).
  • ADR-015: MQTT broker choice.
  • ADR-016: Time-series storage choice (TS collection vs. external TSDB).

11. Sign-off Criteria (Gate G3)

  • All Acceptance Criteria met.
  • Engineering Lead, Data Platform Lead, Security Lead, Product Owner sign _gates/Gate_G3_signoff.md.
  • KTC sample import successful in staging (proven end-to-end with the 8 KTC source files).
  • Tagged phase-3-v1.0.

12. Risks & Mitigations

RiskLIMitigation
Time-series throughput insufficient at scale24Spike 1.D.1 done; horizontal shard + sharded TS collection plan.
BACnet device discovery flaky on heterogeneous BMS33Manual mapping fallback; vendor-by-vendor adapter notes.
MQTT broker operational burden23Use managed broker (EMQX Cloud or HiveMQ) for prod.
Importing 8k+ Asset Ledger slow / locks DB23Stream the import; batch size 500; background job.
Edge agent install on customer hardware fragile33Provide Docker image as the primary path; binary as fallback.