proposal draft

Shadow Vampire: Monolith Data Extraction & Shadow Pipeline Strategy

Bob Matsuoka Updated 2026-03-11
engineering platform monolith data-extraction shadow-pipeline ml kinesis graphql

Shadow Vampire: Monolith Data Extraction & Shadow Pipeline Strategy

Date: 2026-02-28 Audience: CTO / Engineering Leadership, Platform Team Watchwords: Minimal impact, safe, unobtrusive


1. Executive Summary for Leadership

The Strategy in Plain English

Shadow means we build a parallel pipeline that runs alongside the monolith. The monolith is the oracle -- its outputs are ground truth. The shadow validates its own logic against the oracle before it ever touches production traffic. When the shadow reliably matches the oracle, we shift traffic. We never guess about correctness -- we prove it.

Vampire means we extract data through thin needles -- the existing Kinesis SaveHooks infrastructure, the gcdb-stream-processor Anti-Corruption Layer, and a small number of targeted new insertion points. The monolith is never aware it is being drained. No surgery. No anesthesia. Just thin needles collecting what flows naturally.

The core goal: Extract data before and after each transformation stage in the monolith. Use those real before/after pairs as training data and validation oracles. We are not reimplementing the monolith's logic. We are recreating its outcomes using ML, LLMs, and modern event-driven engineering.

Key Numbers

Metric Value
Timeline ~20 weeks (4 phases)
Agentic dev cost ~$1,740--3,205 (LLM-assisted code generation)
LLM fine-tuning ~$1,200 (PMS normalizer training)
Runtime cost ~$140/month (inference + storage + Kinesis)
Year 1 total ~$2,952 (all inference costs)
Engineering cost ~$30K--50K (human review + deployment)

ROI Comparison

Investment Type Traditional Rewrite Shadow Vampire (Agentic)
Engineering cost ~$400K--600K ~$30K--50K
API / inference credits $0 ~$1,750--3,200
Timeline 18--24 months 20 weeks
Risk High (big-bang rewrite) Very low (shadow validates against oracle)

Risk Posture

  • The monolith is never modified structurally. All extraction is additive (new hooks alongside existing ones).
  • Feature flag rollback at every phase. The existing gcdb-stream-trigger permission provides instant rollback for any streaming change.
  • Divergence gating enforces correctness. No traffic shifts until the shadow proves <0.1% divergence over 7 days.
  • Shadow mode means zero production impact. The shadow pipeline reads from the same Kinesis stream but writes to a separate data lake. Production traffic is never at risk.

2. Problem Statement

The Duetto monolith is the system of record for 20+ entity types stored in MongoDB, streamed via Kinesis, and served through GraphQL + 18 legacy REST controllers. It handles PMS integration (ETL ingest), entity enrichment, cube computation, rate optimization, and outbound rate push.

Why we cannot simply rewrite it:

  1. Tightly coupled modules. The data, api, etl, frontend, and data-export modules share entity models, SaveHooks, and computation logic. Feature-by-feature extraction requires understanding every dependency.

  2. Incomplete extraction coverage. Most AFTER states are captured (Kinesis streams post-save entities), but most BEFORE states are missing. Without before/after pairs, we cannot validate that a replacement produces the same outputs.

  3. Transient computation. The most valuable data -- multi-dimensional cubes computed by RulesDataQueryV2 -- exists only in memory. It is never persisted or streamed.

  4. 18 legacy REST controllers. The frontend still depends on REST endpoints that bypass the GraphQL layer, making it impossible to route shadow traffic through a single control plane.

The result: Platform modernization is blocked by the inability to validate replacement correctness. A traditional rewrite would cost $400K--600K over 18--24 months with high risk and no way to prove the new system matches the old one.


3. Proposed Solution: The Shadow Vampire Pattern

Architecture: Current State

+---------------------------------------------------------------------------+
|                          EXTERNAL SYSTEMS                                  |
|  PMS / OTA / CRS -------------------------------- Rate Shopper / DRE      |
+----------+-----------------------------------------------------+----------+
           | ETL Inbound                                          | ETL Outbound
           v                                                      |
+--------------------------------+                                |
|         ETL MODULE             |                                |
|  (Data ingestion, OTA XML,     |                                |
|   rate push, restriction push) |                                |
+----------+---------------------+                                |
           v                                                      v
+---------------------------------------------------------------------------+
|                          CORE MONOLITH                                     |
|                                                                            |
|   MongoDB <---- DaoRuntime <---- CompositeSaveHooks <---- Kinesis ----->  |
|     (system of record)           (20+ entities streaming)                 |
|                                                                            |
|   API MODULE (computation, transformation, enrichment)                     |
|   FRONTEND MODULE (GraphQL primary + 18 legacy REST controllers)           |
+---------------------------------------------------------------------------+
        | Kinesis stream                    | GraphQL / REST
        v                                   v
+---------------------------+   +--------------------------------------------+
|  gcdb-stream-processor    |   |  data-export module                        |
|  (Anti-Corruption Layer)  |   |  (20 Parquet exporters -> S3, batch)       |
|  Kinesis -> Firehose -> S3|   |                                            |
+---------------------------+   +--------------------------------------------+

Architecture: Target State (Shadow Vampire)

+---------------------------------------------------------------------------+
|  MONOLITH (oracle -- never modified structurally)                          |
|                                                                            |
|  Stage 1: PMS -> ETL -> MongoDB        NEEDLE A: ETL queue tap            |
|  Stage 2: MongoDB -> Kinesis           NEEDLE B: SaveHooks (already in)   |
|  Stage 3: Kinesis -> Transformed       NEEDLE C: gcdb-stream-processor    |
|  Stage 4: State -> Output              NEEDLE D: Output capture hooks     |
+---------------------------------------------------------------------------+
           |                  |                 |                  |
           v BEFORE           v BEFORE+AFTER    v AFTER            v AFTER
+------------------------------------------------------------------------+
|                    SHADOW DATA LAKE                                      |
|  Raw PMS payloads  |  Entity events  |  Transformed msgs |  Output state|
|  (pre-transform)   |  (post-save)    |  (post-gcdb)      |  (final)     |
+------------------------------------------------------------------------+
           |                  |                 |                  |
           +------------------+-----------------+------------------+
                                      |
                              +-------v----------+
                              |  SHADOW PIPELINE  |
                              |  (ML / LLM /      |
                              |   modern eng.)    |
                              +-------+-----------+
                                      |  Outputs
                                      v
                              +-------------------+
                              |   DIVERGENCE      |
                              |   DETECTOR        | <-- compares shadow
                              |   (shadow vs      |     output vs monolith
                              |    monolith)      |     output
                              +-------------------+
                                      | When divergence < threshold
                                      v
                              Route traffic to shadow

Module Dependency Map

frontend  --> api  (computation, resolvers)
frontend  --> data (DAO runtime, entities, permissions)
api       --> data (SaveHooks, MongoDoc, Translators)
etl       --> api  (rate push pipeline)
etl       --> data (DAO access)
data-export -> data (entity access)
gcdb-stream-processor --> data (entity models, translators)
gcdb-stream-processor --> api  (enrichment context)

Key insight: data is the lowest-level module. Every entity (MongoDoc + TRANSLATOR) can have a thin needle inserted via SaveHooks with zero business logic changes.

VampireExtractor: Single Extraction Class

All extraction points route through one class. One needle, many insertion points.

@Component
@RequiredArgsConstructor
@Slf4j
public class VampireExtractor {

    private final EventPublisherService kinesisPublisher;
    private final S3Client s3Client;
    private final PermissionService permissionService;

    @Value("${vampire.s3.bucket:shadow-data-lake}")
    private String shadowBucket;

    // THIN NEEDLE: capture entity state (AFTER)
    // Called from any SaveHook.postModification()
    public <T extends MongoDoc<?>> void extract(
        Class<T> entityClass,
        OperationType operationType,
        Collection<T> entities,
        ExtractionStage stage
    ) {
        List<T> allowed = entities.stream()
            .filter(e -> allowedToExtract(e))
            .collect(Collectors.toList());
        if (allowed.isEmpty()) return;

        VampireEvent<T> event = VampireEvent.<T>builder()
            .entityClass(entityClass.getName())
            .operationType(operationType)
            .stage(stage)
            .extractedAt(Instant.now())
            .entities(allowed)
            .build();

        if (estimateSizeBytes(event) < 900_000) {  // <900KB -> Kinesis
            publishToKinesis(event);
        } else {                                    // >=900KB -> S3
            publishToS3(event);
        }
    }

    // THIN NEEDLE: capture raw/unstructured payload (BEFORE)
    // Stage 1 (raw PMS payload) and Stage 3 (cube snapshots)
    public void extractRaw(
        String sourceName,
        String hotelId,
        String entityId,
        ExtractionStage stage,
        byte[] rawPayload
    ) {
        // -> S3: before/{stage}/{source}/hotel={id}/{date}/{entityId}.bin
        // Failure is caught and logged -- never affects primary write path
    }
}

Routing logic: Kinesis for small entity events (<1MB); S3 for large payloads (raw PMS XML, cube snapshots). Respects GC_STREAM_DISABLE_STREAMING permission (same gate as existing hooks). Adds shadow metadata so extraction events are distinguishable from production events.


4. Data Transformation Journey

At each transformation stage, we identify the BEFORE state, AFTER state, extraction mechanism (the needle), and what the before/after pair enables in the shadow pipeline.

Stage 1: External Signal -> Normalized Entity (ETL Ingest)

BEFORE:  Raw PMS payload -- OTA XML, HTNG messages, Opera/Apaleo API responses
         IntegrationMessageDoc { payload: raw bytes, integrationType, hotelId }

TRANSFORMATION: IntegrationFacade.process()
    -> parses PMS-specific format
    -> maps to Duetto domain model (Booking, InventoryValue, RateBounds, etc.)
    -> resolves hotel/segment/roomType references
    -> writes normalized MongoDoc via DaoRuntime

AFTER:   Normalized MongoDoc (e.g., Booking, InventoryValue)
         Serialized via TRANSLATOR -> DBObject -> Kinesis event

SHADOW USE:
    -> Train normalization model: raw OTA XML -> Duetto entity
    -> Validate: shadow normalization output == monolith output

Currently extracted: AFTER only (Kinesis stream has the post-normalization entity). Gap: BEFORE (raw PMS payloads) is not captured. Effort to close gap: Low -- add a pre-save tap to IntegrationMessageDoc.

Stage 2: Raw Entity -> Derived/Enriched Entity (Computation Layer)

BEFORE:  Raw entity as written (e.g., InventoryValue as received from PMS)

TRANSFORMATION: API module computation
    -> InventoryValue -> InventoryLatest (aggregation)
    -> Booking -> BookingStayDenorm (denormalization with segment enrichment)
    -> BlockHeader -> BlockDenorm (group block denormalization)
    -> RatePushStateMapDetail (derived from rate push events)

AFTER:   Derived entity with enrichment applied

SHADOW USE:
    -> Train enrichment model: raw entity -> derived entity
    -> Validate: shadow enrichment matches monolith derived entity

Currently extracted: AFTER (derived entities via Kinesis). Gap: BEFORE needs explicit correlation (same hotelId/entityId, close timestamps). Effort to close gap: Low -- both streams exist; need correlation key + event windowing.

Stage 3: Entity State -> Aggregate View (Cube Computation)

BEFORE:  Individual entity documents in MongoDB
         (InventoryLatest per room type + date, BookingStayDenorm per hotel + date,
          Segment definitions, RollupSegmentation config)

TRANSFORMATION: RulesDataQueryV2 parallel cube assembly
    -> OtbMoleculeQuery       -> OTB (on-the-books) cube
    -> CommitQueries.DENSE    -> Committed bookings cube
    -> DemandQueries          -> Demand forecast curve
    -> InventoryCubesQuery    -> Availability cube
    -> Demand360OccupancyCube -> Competitor occupancy

AFTER:   Multi-dimensional cubes: Function<DayOnly, Table<segmentId, RulesFnType, Integer>>
         Computed in-memory; NOT currently persisted or streamed.

SHADOW USE:
    -> Richest training signal: inputs + outputs of aggregation
    -> Train aggregation model: entity snapshots -> cube values
    -> Enables: shadow cube service (event-sourced aggregate)

Currently extracted: Nothing -- these cubes are transient in-memory objects. Gap: Significant. This is the most valuable extraction target. Effort to close gap: Medium -- requires instrumentation in RulesDataQueryV2.

Stage 4: User Action / Schedule -> State Change (Write Path)

BEFORE:  User intent (rate override, rule change, restriction update)
         Or: Scheduled optimization trigger (HotelMessageDoc)

TRANSFORMATION: API module write path
    -> Validates business rules (RateBounds, permissions)
    -> Applies cascading updates (SubRate -> PricingRule propagation)
    -> Persists to MongoDB -> SaveHooks -> Kinesis

AFTER:   Updated entity state in Kinesis stream

SHADOW USE:
    -> State machine validation: apply same event sequence to shadow
    -> Verify shadow reaches same state after N events
    -> Foundation for event-sourced shadow state

Currently extracted: Fully covered for 20+ entities. The SaveHooks infrastructure IS the needle.

Stage 5: State -> Outbound Push (ETL Outbound)

BEFORE:  Monolith's current rate/restriction state (RatePushStateMapDetail pre-push)

TRANSFORMATION: RatePushService -> DreFacade -> HTTP to DRE/OTA
    -> Packages rates per PMS format
    -> Applies push batching / partition logic
    -> Delivers to DRE or OTA endpoint

AFTER:   RatePushStateMapDetail with push result (success/failure)

SHADOW USE:
    -> Complete audit trail for any rate delivered to any PMS
    -> Shadow can validate it would deliver the same rates

Currently extracted: AFTER (post-push state via Kinesis). Gap: Pre-push rate values (BEFORE) need correlation from RatePublicValue stream.

Extraction Coverage Summary

Stage Before After Gap Effort
1: PMS Ingest Missing Kinesis Raw payloads not captured Low
2: Entity Enrichment Partial Kinesis Need before/after correlation Low
3: Cube Aggregation Missing Missing Transient in-memory; not streamed Medium
4: Write Path Full Kinesis None -- SaveHooks cover this fully None
5: Outbound Push Partial Kinesis Pre-push values need correlation Low

5. Extraction Points (The Thin Needles)

Six needles extract data without the monolith's awareness. Three are already active; two are new; one is on-demand.

Extraction Points Summary

Needle Status Stage Data Extracted Risk
Kinesis SaveHooks Active 2, 4 20+ entity types, all writes None
gcdb-stream-processor Active 2->3 Transformed messages -> Firehose None
BackfillService Active Historical Full entity history Very low
ETL Queue Tap New 1 (BEFORE) Raw PMS payloads Very low
Cube Computation Capture New 3 (BEFORE+AFTER) Aggregated cube data Low
GraphQL API Active On-demand Structured queries None

Needle 1: Kinesis SaveHooks Stream (Active)

Every MongoDB write for 20+ entity types flows through SaveHooks. CREATE, UPDATE, DELETE with full entity body serialized via TRANSLATOR.serialize().

MongoDB save
    -> DaoRuntime
    -> CompositeSaveHooks.postModification()
    -> GcStreamOperationEventUtils.publishToStream()
        -> filters GC_STREAM_DISABLE_STREAMING permission
        -> EventPublisherService.publishEvents()
        -> KinesisEventPublisherService -> KPL -> Kinesis Stream

Missing entities: BlockHeader (DP-338), BlockDenorm (DP-295), Hotel, HotelSetup, Booking.

Needle 2: gcdb-stream-processor (Active -- The ACL)

Consumes raw Kinesis events, routes by entity type through EntityTransformer, publishes via GcEntityPublisher to Firehose and S3/Athena. Translates internal MongoDB types (DBObject) to public-contract types (Message<T>). 22 entity types registered:

BlockDenorm, BlockHeader, BookingStayDenorm, Company, CompanySetup, Demand360BookingDetail, Demand360Competitor, Demand360Compset, Demand360GroupDetail, InventoryLatest, InventoryOverride, InventoryValue, PriceOverride, PricingRule, RateBounds, RatePushStateMapDetail, RatePublicHistory, RatePublicValue, Relationship, RestrictionPushStateMapDetail, RollupSegmentation, Segment.

Needle 3: BackfillService (Active -- Historical Bootstrap)

Closes the "cold start" problem. Any new consumer needing historical data can bootstrap via BackfillService:

POST /internal/api/v1/backfill/initiate
{
  "entityType": "Booking",
  "hotelIds": ["hotel-id-1"],
  "companyIds": ["company-id"],
  "startDay": "2022-01-01",
  "endDay": "2026-01-01",
  "documentsPerSecond": 1000
}
-> Creates Redis-backed BackfillTicket
-> HistoricalLoader streams historical docs through the SAME Kinesis pipeline
-> No gap between historical and live data for new consumers

The shadow pipeline must consume from Kinesis (historical + live, no gap) rather than polling MongoDB directly. BackfillService makes this possible.

Needle 4: ETL Queue Tap (New -- Stage 1 BEFORE Extraction)

A pre-processing hook on IntegrationMessageDoc that writes the raw payload (before any transformation) to S3.

@Component
@RequiredArgsConstructor
public class IntegrationMessagePreCapture extends SaveHooks<IntegrationMessageDoc> {
    private final VampireExtractor vampireExtractor;

    @Override
    public Optional<String> preBulkInsert(
        IntegrationMessageDoc doc, BulkSaveStats stats
    ) {
        vampireExtractor.extractRaw(
            doc.getIntegrationType().name(),
            doc.getHotelId(),
            doc.getId(),
            ExtractionStage.PMS_INGEST_BEFORE,
            doc.getPayload().getBytes(StandardCharsets.UTF_8)
        );
        return Optional.empty();  // Never cancel; purely observational
    }
}

Risk: Zero. preBulkInsert is called before save; returning Optional.empty() means "proceed normally." Any exception in extractRaw is caught and logged; the save continues regardless.

Needle 5: Cube Computation Capture (New -- Stage 3 BEFORE/AFTER)

Instrumentation inside RulesDataQueryV2 that publishes assembled cubes to S3 after parallel query completion.

// BEFORE cube assembly (capture input entity snapshot):
vampireExtractor.extractRaw(
    "cube-inputs", spec.getDataConfig().getHotelId(),
    spec.getDataConfig().getHotelId() + "-" + spec.getDataConfig().getDateRange().start(),
    ExtractionStage.CUBE_ASSEMBLY_BEFORE,
    CubeInputSnapshot.serialize(spec).getBytes()
);

// AFTER cube assembly (capture output):
vampireExtractor.extractRaw(
    "cube-outputs", spec.getDataConfig().getHotelId(),
    spec.getDataConfig().getHotelId() + "-" + spec.getDataConfig().getDateRange().start(),
    ExtractionStage.CUBE_ASSEMBLY_AFTER,
    CubeOutputSnapshot.serialize(cubeResult).getBytes()
);

Risk: Low. Both calls are async (S3 non-blocking write). Failure is logged and swallowed -- cube computation is unaffected.

Needle 6: GraphQL API (Active -- On-Demand)

POST /graphql -- on-demand queries for structured data. Not suitable for bulk streaming. Best for querying current state for validation, ad-hoc shadow comparisons, and debugging.


6. Shadow Pipeline Architecture

The Shadow Pattern

The shadow pipeline is a parallel system that: 1. Consumes the same input events as the monolith (via Kinesis) 2. Applies new transformation logic (ML/LLM/modern engineering) 3. Produces outputs in the same schema as the monolith 4. Sends outputs to a Divergence Detector that compares shadow vs. oracle

                    Monolith (oracle)
                        |
                    Kinesis stream
                   /              \
          Shadow pipeline       gcdb-stream-processor
                |                        |
          Shadow outputs         Monolith outputs
                \                /
             Divergence Detector
                     |
           +---------+----------+
           | MATCH   |  DIVERGE |
           +---------+----------+
                |           |
            confidence++  log + alert

Divergence Detection

The Divergence Detector consumes both shadow and monolith outputs for the same event (correlated by hotelId + entityId + timestamp window) and emits:

{
  "entityType": "InventoryLatest",
  "hotelId": "hotel-123",
  "entityId": "inv-456",
  "monolithOutput": { },
  "shadowOutput": { },
  "diverged": false,
  "diffFields": [],
  "confidence": 0.997,
  "windowMs": 250
}

Divergence metrics feed a dashboard and alerting system. When divergence rate drops below threshold (e.g., <0.1% over 7 days for a hotel segment), that entity type is ready for traffic shifting.

Traffic Shifting Strategy

Shadow pipeline is not big-bang. Roll out hotel by hotel or entity type by entity type:

Week 1:   Shadow reads from Kinesis (shadow mode -- no writes)
          Divergence Detector runs; all mismatches logged
Week 2-4: Tune shadow logic; divergence drops
Week 5:   Shadow mode for 5% of hotels (selected by low-risk profile)
Week 8:   Shadow mode for 25% of hotels
Week 12:  Shadow mode for 100% if divergence < 0.1%
Week 16:  Traffic shifted; monolith becomes fallback

Rollback at any point: flip feature flag. Monolith never changes.

State Management in Shadow

The shadow pipeline maintains state equivalent to MongoDB via event sourcing:

Event-Sourced State Store (e.g., Apache Flink, Kafka Streams, or custom)
    -> Consumes Kinesis events in order
    -> Maintains per-entity state (last known value)
    -> Provides "current state" for any entity at any point in time
    -> BackfillService seeds the initial state
    -> Live Kinesis keeps it current

This is the shadow's "memory" -- it knows what the monolith knows, without touching MongoDB.


7. FE API Extraction

The FE API extraction removes the 18 legacy REST controllers by migrating FE data access to GraphQL. This is a thin needle: GraphQL resolvers delegate to existing services -- no business logic change.

REST Controllers to Migrate

Controller Path Priority GraphQL Target
ForecastController /forecast/* HIGH HotelQuery.forecast(...)
FinancialForecastReportController /forecast/financial/* HIGH HotelQuery.forecast.financial(...)
FinancialForecastSnapshotController /forecast/financial/snapshot/* HIGH HotelQuery.forecast.snapshot(...)
BookingHistoryController /booking/history/* HIGH HotelQuery.bookingHistory(...)
RatePublicHistoryController /rate/public/history/* HIGH HotelQuery.ratePublicHistory(...)
RatePublicValueController /rate/public/value/* HIGH HotelQuery.ratePublicValue(...)
RatePublicLatestController /rate/public/latest/* HIGH HotelQuery.ratePublicLatest(...)
PromotionController /promotion/* MEDIUM HotelQuery.promotions + mutations
PromotionPlanController /promotion/plan/* MEDIUM HotelQuery.promotionPlans
Demand360CompsetsController /demand360/compsets/* MEDIUM HotelQuery.demand360Compsets
StrDailyIndexController /str/daily/* MEDIUM HotelQuery.strDaily(...)
StrMonthlyIndexController /str/monthly/* MEDIUM HotelQuery.strMonthly(...)
EnterpriseRegionIndexController /enterprise/* LOW HotelQuery.enterpriseRegion(...)
ScheduledReportController /scheduled-reports/* LOW HotelQuery.scheduledReports
CompanyController /company/* LOW CompanyQuery.*
BlockHeaderController /settings/blockheader/* LOW HotelMutation.blockHeader.*
DemandDebuggerController /debug/demand/* LOW AdminQuery.demandDebug(...)
ForecastDebuggerController /debug/forecast/* LOW AdminQuery.forecastDebug(...)

Migration Pattern (Thin Needle)

Each REST migration follows the same pattern. No business logic is duplicated:

Step 1: Add GraphQL type definitions to schema.graphqls
Step 2: Implement Java resolver class (delegates to EXISTING service)
Step 3: Update React component to use Apollo/GraphQL query
Step 4: Dual-run for 1 sprint (REST + GraphQL both active)
Step 5: Remove REST controller

Critical rule: The resolver layer is PURE API TRANSLATION.
               Resolvers call existing services. Full stop.

Why This Matters for Shadow Vampire

GraphQL migration is not just API cleanup -- it is a prerequisite for the shadow pipeline's FE layer. Once all FE data flows through GraphQL:

FE -> GraphQL -> (resolver) -> [SHADOW SWITCH]
                                    +-- monolith service (current)
                                    +-- shadow service (new)
                                         ^
                                   feature flag per hotel

The GraphQL layer becomes the control plane for shadow traffic routing. No FE changes required for A/B testing between monolith and shadow implementations.


8. ML/LLM Approach

The Philosophy

We are not reverse-engineering the monolith's business logic. We are training systems to reproduce its outputs from the same inputs. The monolith serves as the labeled training set: every before/after pair we extract is a training example.

This is fundamentally different from "rewrite the rules engine." We do not need to understand why the monolith produces a given output. We need our shadow to produce the same output given the same input -- and we have ground truth to measure against.

Approach by Transformation Stage

Stage Method Rationale
PMS normalization LLM (generative) High format variation; LLMs handle naturally
Entity enrichment LLM-translated rules Deterministic; translation is accurate
Cube aggregation LLM-translated + shadow validation Complex but deterministic
State management Event sourcing (pure engineering) No ML needed; well-solved problem
FE API layer LLM for boilerplate generation GraphQL resolvers are repetitive

Stage 1 -- PMS Normalization: Fine-tuned LLM (Llama 3.1 8B) trained on (raw payload, normalized entity) pairs. PMS formats are inconsistent, version-dependent, and vendor-specific -- LLMs handle format variation naturally. Target: >99.9% field-level agreement before traffic shift.

Stage 2 -- Entity Enrichment: LLM reads existing Java transformer code and generates equivalent Python/Go/TypeScript logic. Most enrichment is deterministic (aggregation, denormalization). "Translate this Java method to Python" is a well-solved LLM task.

Stage 3 -- Cube Aggregation: LLM reads RulesDataQueryV2.java and generates event-sourced aggregation logic. If LLM translation has residual error, fine-tune a lightweight regression model on (snapshot -> cube) pairs to correct bias. Target: <0.01% mean absolute error.

Stage 4 -- State Management: Event sourcing via Apache Flink or Kafka Streams. No ML needed. Consumes Kinesis events in order, maintains per-entity state store.

LLM-Assisted Code Migration

For deterministic transformations, LLMs translate existing Java logic to modern service implementations:

1. Read Java source (e.g., InventoryLatestTransformer.java)
2. LLM: "Translate this Java class to a Python Flink operator"
3. Unit test against known before/after pairs from extraction layer
4. Divergence Detector validates at scale (100K+ events)
5. Deploy when divergence < threshold

Cost: ~$5-15 per Java class translated (Claude API)
Quality bar: CI gate requires >99.5% agreement on held-out test set

Cost Estimates

One-Time Development Costs (LLM-Assisted Code Generation)

Task Model Cost Est.
Translate 23 EntityTransformer classes (Java -> Python) Sonnet ~$8.60
Generate 18 GraphQL resolvers + schema types Haiku ~$0.90
Generate VampireExtractor + 5 insertion point stubs Sonnet ~$0.75
Generate IntegrationMessagePreCapture hook Haiku ~$0.03
Generate RulesDataQueryV2 instrumentation patch Sonnet ~$1.44
Translate RulesDataQueryV2 cube assembly logic Sonnet ~$1.26
Translate PricingOptimizerService stages Sonnet ~$2.10
Generate CubeSnapshot Flink/Kafka job Sonnet ~$1.65
Generate Divergence Detector service Sonnet ~$1.35
Generate BackfillService consumer wrapper Haiku ~$0.10
Iteration + refinement (3x budget multiplier) Mixed ~$54
One-time dev total ~$72

These are API credit costs only. Engineer time for review, testing, and deployment is additional (estimated 2-5 days per major component).

Ongoing Runtime Inference Costs

Workload Monthly Est.
PMS normalization (fine-tuned Llama 3.1 8B, self-hosted) ~$100/mo
Shadow validation ad-hoc queries (Athena SQL, ~1TB/mo) ~$5/mo
S3 storage (shadow data lake, ~500GB/mo new data) ~$11.50/mo
Kinesis (VampireExtractor events, ~2x current volume) ~$22/mo
Monthly runtime total ~$140/mo

Fine-Tuning Cost (LLM PMS Normalizer)

Step Cost Est.
Training data prep (100K PMS message pairs) $0 (own data)
Fine-tune Llama 3.1 8B on 100K pairs ~$200-400
Evaluation against held-out test set (Claude scoring) ~$15
Re-fine-tune iterations (3x) ~$600-1,200
Fine-tuning total ~$815-1,615

Total Inference Cost Summary

Category One-Time Ongoing/Mo Year 1 Total
Code generation (LLM dev assist) ~$72 $0 $72
Fine-tuning (PMS normalizer) ~$1,200 $0 $1,200
Runtime inference $0 ~$140 ~$1,680
Grand total ~$1,272 ~$140/mo ~$2,952

Comparable traditional cost: ~$400K-600K engineering + ongoing maintenance at ~$50K/year.


9. Development Plan

Phases are structured around the metaphor: install the needles, collect the blood, build the shadow, prove the shadow, shift traffic.

Phase 0: Install the Needles (Weeks 1-2)

Goal: Complete streaming coverage. Every entity type flows through Kinesis. No gaps in the blood supply.

Step Action Effort Agentic Cost Risk
0.1 Enable BlockHeader + BlockDenorm streaming (uncomment ~14 lines, DP-338/DP-295) 30 min $10-20 Near-zero
0.2 Add Hotel + HotelSetup Kinesis streaming (new SaveHooks + transformers) 4-6 hrs each $40-80 Low
0.3 Add Booking Kinesis streaming (extend BookingChangeHooks) 4-6 hrs $40-80 Low

Phase 0 Total: ~$90-180 | 2-4 engineer-days | 1-2 weeks Deliverable: All critical entities streaming. Shadow data lake has complete blood supply.

Phase 1: Blood Collection -- Before/After Extraction (Weeks 2-5)

Goal: Close the extraction gaps. Capture BEFORE state for Stage 1 (raw PMS) and Stage 3 (cubes). Set up shadow data lake schema and Divergence Detector.

Step Action Effort Agentic Cost Risk
1.1 ETL Queue raw payload capture (Needle 4) 1 week $75-125 Very low
1.2 Cube Computation capture (Needle 5) 2 weeks $100-200 Low
1.3 Shadow Data Lake schema + Divergence Detector MVP 2 weeks $150-250 Low

Phase 1 Total: ~$325-575 | 3-5 engineer-weeks | 4 weeks Deliverable: Complete before/after extraction at all 5 stages. Divergence Detector running. Shadow data lake populated.

Phase 2: FE API Extraction (Weeks 4-12, overlaps Phase 1)

Goal: Migrate all 18 remaining REST controllers to GraphQL. GraphQL becomes the shadow pipeline's control plane for traffic routing.

Step Action Effort Agentic Cost Removes
2.1 Rate Public Data GraphQL 1 sprint $75-150 3 REST controllers
2.2 Booking History GraphQL 1 sprint $75-150 1 REST controller
2.3 Forecast GraphQL (most complex; dual-run 1 sprint) 3-4 sprints $200-350 3 REST controllers
2.4 Demand360 + STR GraphQL 2 sprints $100-175 3 REST controllers
2.5 Promotions + remaining controllers 2 sprints $125-225 8 REST controllers

Phase 2 Total: ~$575-1,050 | 4-6 engineer-weeks | 8 weeks Deliverable: All FE data through GraphQL. 18 REST controllers removed. GraphQL layer ready as shadow traffic control plane.

Phase 3: Shadow Pipeline -- Build and Validate (Weeks 8-20)

Goal: Build shadow implementations for each transformation stage. Validate against monolith oracle. Achieve production-ready divergence rates.

Step Action Effort Agentic Cost Risk
3.1 Shadow Stage 1: LLM PMS Normalizer (fine-tune, deploy, A/B) 6-8 weeks $200-400 Medium
3.2 Shadow Stage 2: LLM-translated entity enrichment (Java -> Python/Go) 4-6 weeks $150-300 Low
3.3 Shadow Stage 3: Event-sourced cube service (Flink/Kafka Streams) 6-8 weeks $300-500 Medium
3.4 Divergence Rate production gate (per-hotel traffic shift) 2-4 weeks $100-200 Low

Phase 3 Total: ~$750-1,400 | 6-10 engineer-weeks | 12 weeks Deliverable: Shadow pipeline in production for Stages 1-3. Divergence Detector as automated CI gate. Traffic shifting in progress.

Development Plan Summary

Phase Duration Agentic Cost Human Review Deliverable
0: Needles 2 weeks $90-180 2-4 days Complete streaming coverage
1: Collection 4 weeks $325-575 3-5 weeks Before/after extraction all stages
2: FE API 8 weeks $575-1,050 4-6 weeks 18 REST controllers -> GraphQL
3: Shadow Build 12 weeks $750-1,400 6-10 weeks Shadow pipeline in production
Total ~20 weeks ~$1,740-3,205 ~15-25 weeks Shadow Vampire operational

ROI

Investment Type Traditional Agentic (Shadow Vampire)
Engineering cost ~$400K-600K ~$30K-50K
API credits $0 ~$1,750-3,200
Timeline 18-24 months 20 weeks
Risk High (rewrite) Very low (shadow validates against oracle)

The monolith is never modified structurally. The risk floor is minimal: every phase has a feature flag rollback, and the Divergence Detector enforces correctness before any traffic shifts.


10. Risks & Mitigations

Risk Likelihood Impact Mitigation
Kinesis stream volume doubles with VampireExtractor events Medium Low VampireExtractor respects existing GC_STREAM_DISABLE_STREAMING permission. Shard auto-scaling. Estimated cost increase: ~$22/mo.
Cube Computation Capture (Needle 5) affects latency Low Medium S3 writes are async (non-blocking). cubePublisher is a no-op in dev. Failure is logged and swallowed.
LLM PMS Normalizer does not reach 99.9% accuracy Medium Medium Shadow mode means mismatches are logged, not served. Fallback to monolith is automatic. Can run shadow indefinitely until accuracy is proven.
Event ordering in shadow state store Medium High Well-solved via Flink/Kafka Streams ordered consumption by (partitionKey, sequenceNumber). BackfillService provides gap-free historical bootstrap.
GraphQL migration breaks FE features during dual-run Low Medium Dual-run period: REST + GraphQL both active for 1 sprint per controller. Feature flag per hotel. Rollback removes GraphQL route; REST continues.
Shadow pipeline contamination of production data Low High All VampireExtractor events tagged with source: "vampire". Shadow writes to separate S3 prefix (shadow-data-lake). Divergence Detector filters by tag.
Scope creep beyond 20 weeks Medium Medium Phases are independent. Phase 0+1 deliver value (complete extraction) even if Phases 2-3 are deferred. Each phase has its own deliverable and can be paused.

11. Appendix: File Reference Map

Core Streaming Infrastructure (The Primary Needles)

data/src/main/java/com/duetto/model/mongo/dml/SaveHooks.java
    -> Base hook class: preBulkInsert, preBulkUpdate, postModification

data/src/main/java/com/duetto/model/mongo/dml/CompositeSaveHooks.java
    -> Chains multiple hooks per entity

data/src/main/java/com/duetto/model/hooks/GcStreamOperationEventUtils.java
    -> publishToStream() -- the thin needle insertion point
    -> allowedToStream() -- filters GC_STREAM_DISABLE_STREAMING

data/src/main/java/com/duetto/tools/eventpublisher/impl/KinesisEventPublisherService.java
    -> KPL batching (100/UUID partition key)
    -> Metrics: monolith_kinesis_event_publisher_total

data/src/main/java/com/duetto/model/company/CompanyPermission.java
    -> GC_STREAM_DISABLE_STREAMING: opt-out for specific companies

Needles Ready to Activate

data/src/main/java/com/duetto/model/hooks/BlockHeaderSaveHooks.java
    -> Kinesis COMMENTED OUT -- uncomment for DP-338 (~7 lines)
    -> BlockHeaderTransformer already registered in gcdb-stream-processor

api/src/main/java/com/duetto/model/hooks/BlockDenormChangeHooks.java
    -> Kinesis COMMENTED OUT -- uncomment for DP-295 (~7 lines)
    -> BlockDenormTransformer already registered in gcdb-stream-processor

gcdb-stream-processor (The Anti-Corruption Layer)

gcdb-stream-processor/.../consumer/GcEventProcessor.java
    -> TRANSFORM_MODEL_CLZ_BY_EVENT_CLZ: 22 entity types registered
    -> Route new entity type: add one map entry

gcdb-stream-processor/.../consumer/transformer/v1/EntityTransformer.java
    -> Interface: accept(companyId, eventObjectWrappers)

gcdb-stream-processor/.../publisher/GcEntityPublisher.java
    -> FirehoseClient: max 500 records/batch, 4MiB, 50 retries

gcdb-stream-processor/.../backfill/BackfillService.java
    -> POST /internal/api/v1/backfill/initiate
    -> BackfillParameters: {entityType, hotelIds, companyIds, startDay, endDay, rps}

Before/After Extraction Targets (New Needles)

etl/src/main/java/com/duetto/etl/integration/IntegrationMessageDoc.java
    -> Add preBulkInsert hook -> raw payload -> S3 before ETL processing

api/src/main/java/com/duetto/pricing/input/RulesDataQueryV2.java
    -> Add cube publication after parallel query assembly
    -> CubeSnapshot -> S3 (not Kinesis -- cubes are large)

GraphQL (Migration Targets)

frontend/src/main/resources/schema.graphqls              (8,067 lines)
    -> Add type definitions for each migrating REST controller

api/src/main/java/com/duetto/api/graphql/               # Resolver implementations
    -> Add one resolver class per migrating controller

frontend/src/main/java/com/duetto/frontend/app/controller/
    -> 18 REST controllers to migrate -> deprecate after dual-run

Data Lake Export (Batch Extractions)

data-export/src/main/java/com/duetto/dataexport/exporters/
    -> 20 Parquet exporters -> S3
    -> Covers: Booking, Inventory, Forecast (dense/constrained/unconstrained),
      RatePushResult, RoomTypeCapacity, Shopping, Demand360, Block, Folio,
      Enterprise metrics, PriceratorStates

data-export/src/main/java/com/duetto/dataexport/export/ExportProcessor.java
    -> Thread pool; ExportEnqueuer -> ExportProcessor -> S3

Shadow Data Lake S3 Layout

s3://shadow-data-lake/
+-- before/
|   +-- pms_ingest_before/
|   |   +-- OPERA/    hotel=hotel-123/2026-02-28/msg-uuid.bin    <- raw OTA XML
|   |   +-- APALEO/   hotel=hotel-456/2026-02-28/msg-uuid.bin
|   |   +-- OHIP/     ...
|   +-- cube_assembly_before/
|       +-- hotel=hotel-123/2026-02-28/cube-uuid.json            <- entity snapshots
+-- after/
|   +-- entity_write/
|   |   +-- Segment/       hotel=hotel-123/2026-02-28/evt-uuid.json
|   |   +-- InventoryLatest/ hotel=hotel-123/2026-02-28/evt-uuid.json
|   |   +-- PricingRule/   ...
|   +-- cube_assembly_after/
|       +-- hotel=hotel-123/2026-02-28/cube-uuid.json            <- assembled cube
+-- push/
    +-- rate_push_after/
        +-- hotel=hotel-123/2026-02-28/push-uuid.json            <- push result

Divergence Check Query (Athena)

-- Divergence check: shadow output vs monolith output for same hotel/date
SELECT
    m.hotel_id,
    m.entity_id,
    m.segment_id,
    m.yield_value    AS monolith_yield,
    s.yield_value    AS shadow_yield,
    ABS(m.yield_value - s.yield_value) AS delta
FROM monolith_cube_output m
JOIN shadow_cube_output s
  ON m.hotel_id = s.hotel_id
 AND m.cube_date = s.cube_date
 AND m.segment_id = s.segment_id
 AND ABS(m.extracted_at_epoch - s.extracted_at_epoch) < 10000  -- 10s window
WHERE ABS(m.yield_value - s.yield_value) > 0.01  -- flag >1% divergence

Supporting Documents