Integration

Consumer Guide

Implementing the receiver role

Audience: Integrators, application engineers. Use: Implement receiver-side fetching, confirmation, and idempotent processing correctly.

Consumer Role

A consumer is any system that retrieves and processes facts from BSFG by calling FetchFacts and ConfirmReceipt.

Consumers are responsible for:

Required Call Sequence

The canonical consumer sequence is:

1. FetchFacts(consumer_name, limit)
   ↓
   [Receive batch of facts with offsets]
   ↓
2. FOR each fact in batch:
     a) apply_business_logic(fact)  [idempotent]
     b) persist_outcome_to_storage()
     c) if error: DO NOT CONFIRM, wait for next fetch
   ↓
3. ConfirmReceipt(consumer_name, highest_offset)
   ↓
   [Cursor advances; next fetch starts from confirmed offset + 1]

Step 1: Fetch Facts

Pull the next batch of facts from the boundary:

FetchFacts(
  consumer_name: "plant-a-receiver",
  limit: 100
) → batch of facts with offsets

The consumer name is durable. The boundary maintains a cursor for each named consumer, tracking the last confirmed offset. Repeated calls to FetchFacts with the same consumer name will resume from the last confirmed position.

Step 2: Process Facts Idempotently

For each fact in the batch:

for fact in facts:
  try:
    // Apply business logic idempotently
    result = apply_business_logic(fact)

    // Persist the outcome durably
    // BEFORE confirming
    persist_outcome(result)

  except Error as e:
    // Do NOT confirm on error
    // Log the error
    log_error(e, fact)
    // Wait for next fetch (batch is incomplete, will retry)
    break  // Exit the loop early

// If we get here, all facts in batch were processed

Step 3: Confirm Receipt

Only after all facts in the batch are processed and outcomes are persisted:

ConfirmReceipt(
  consumer_name: "plant-a-receiver",
  offset: 42  // highest offset from the batch
) → Confirmation: {cursor_advanced_to: 42}

The confirmation advances the boundary's cursor. On the next FetchFacts call, the batch will start from offset 43.

Idempotent Processing

Idempotency is required. Facts may arrive more than once due to:

Idempotency Strategy 1: Unique Message ID

Store a record of processed message_id values. Before processing, check if already processed:

if (already_processed(fact.envelope.message_id)) {
  // Idempotent retry — skip processing
  return SUCCESS
}

// Process fact
result = apply_business_logic(fact)
persist_outcome(result)

// Record that this message_id was processed
mark_as_processed(fact.envelope.message_id)

The deduplication table can use a TTL equal to the boundary's fact retention (default 7 days).

Idempotency Strategy 2: Natural Key

If the business logic has a natural key (entity ID + operation), upsert instead of insert:

// Idempotent upsert using the fact's business key
entity_key = fact.subject + "_" + fact.predicate
business_key = extract_key_from(fact.object_json)

result = upsert(entity_key, business_key, fact.object_json)
// First call creates; subsequent calls update-or-skip

persist_outcome(result)

Idempotency Strategy 3: Outcome Idempotency

If the processing outcome is idempotent by nature (e.g., setting a field to a specific value), re-processing the same fact produces the same result:

// Processing is idempotent by design
outcome = execute_idempotent_operation(fact)

persist_outcome(outcome)
// If called again with the same fact, outcome is identical

Cursor Persistence and Recovery

The boundary maintains a durable named consumer. The consumer cursor (confirmed offset) is persisted and survives restarts.

On Consumer Restart

If the consumer crashes mid-batch:

  1. On restart, call FetchFacts with the same consumer name
  2. The boundary returns facts from last_confirmed_offset + 1
  3. Some facts in the new batch may have been partially processed before the crash
  4. Re-process them (idempotency handles this)

Confirming Only After Durability

Danger: If you confirm receipt before persisting the outcome, a crash between confirmation and persistence loses the outcome.

// ❌ WRONG ORDER:
ConfirmReceipt(...)  // cursor advances
persist_outcome(...)  // crashes before this — outcome lost!

// ✅ CORRECT ORDER:
persist_outcome(...)  // durable
ConfirmReceipt(...)   // cursor advances only after durable persist

Pull-Driven Fetch Model

Consumers pull facts from the boundary, not the other way around. This means:

Polling pattern:

while (true) {
  facts = FetchFacts(consumer_name, limit)

  if (facts.length == 0) {
    // No new facts — wait before retrying
    sleep(100ms)
    continue
  }

  // Process batch
  for fact in facts:
    // ... process idempotently ...

  // Confirm
  ConfirmReceipt(consumer_name, facts.last.offset)
}

Handling Artifacts

If a fact references an artifact:

Extract Reference Metadata

artifact_ref = fact.object_json  // or extract specific fields

bucket = artifact_ref["bucket"]
key = artifact_ref["key"]
digest = artifact_ref["digest"]
media_type = artifact_ref["media_type"]

Retrieve Artifact

artifact_blob = GetObject(bucket, key)

Verify Integrity (Optional)

computed_digest = SHA256(artifact_blob)
if (computed_digest != digest) {
  throw IntegrityError("Artifact digest mismatch")
}

Retry on Unavailability

If GetObject fails with "not found," retry before treating it as permanent:

max_retries = 10
backoff = exponential(base=100ms)
retry_count = 0

while (retry_count < max_retries) {
  try:
    artifact_blob = GetObject(bucket, key)
    // Success — process artifact
    break
  except NotFound:
    retry_count++
    if (retry_count == max_retries) {
      throw ArtifactMissingError("Artifact not found after retries")
    }
    wait(backoff)
    backoff *= 2
}

Handling Delayed or Replayed Facts

Facts may arrive after a long delay (hours or days) due to:

Treat delayed facts the same as fresh facts. Process them idempotently. Do not reject them based on arrival time.

Performance Characteristics

Under normal operation:

Batch size tuning:

Error Handling

Processing Error (Business Logic Fails)

If apply_business_logic throws an error:

Persistence Error (Database Fails)

If persist_outcome throws an error:

Artifact Unavailable

If GetObject fails with a transient error (timeout, temporary unavailability):