Audience: Integrators, application engineers. Use: Implement receiver-side fetching, confirmation, and idempotent processing correctly.
Consumer Role
A consumer is any system that retrieves and processes facts from BSFG by calling FetchFacts and ConfirmReceipt.
Consumers are responsible for:
- Pull-driven fact retrieval using durable named consumers
- Idempotent processing (facts may arrive more than once)
- Persisting business outcomes before confirmation
- Confirming receipt only after durable handling
Required Call Sequence
The canonical consumer sequence is:
1. FetchFacts(consumer_name, limit)
↓
[Receive batch of facts with offsets]
↓
2. FOR each fact in batch:
a) apply_business_logic(fact) [idempotent]
b) persist_outcome_to_storage()
c) if error: DO NOT CONFIRM, wait for next fetch
↓
3. ConfirmReceipt(consumer_name, highest_offset)
↓
[Cursor advances; next fetch starts from confirmed offset + 1]
Step 1: Fetch Facts
Pull the next batch of facts from the boundary:
FetchFacts(
consumer_name: "plant-a-receiver",
limit: 100
) → batch of facts with offsets
The consumer name is durable. The boundary maintains a cursor for each named consumer, tracking the last confirmed offset. Repeated calls to FetchFacts with the same consumer name will resume from the last confirmed position.
Step 2: Process Facts Idempotently
For each fact in the batch:
for fact in facts:
try:
// Apply business logic idempotently
result = apply_business_logic(fact)
// Persist the outcome durably
// BEFORE confirming
persist_outcome(result)
except Error as e:
// Do NOT confirm on error
// Log the error
log_error(e, fact)
// Wait for next fetch (batch is incomplete, will retry)
break // Exit the loop early
// If we get here, all facts in batch were processed
Step 3: Confirm Receipt
Only after all facts in the batch are processed and outcomes are persisted:
ConfirmReceipt(
consumer_name: "plant-a-receiver",
offset: 42 // highest offset from the batch
) → Confirmation: {cursor_advanced_to: 42}
The confirmation advances the boundary's cursor. On the next FetchFacts call, the batch will start from offset 43.
Idempotent Processing
Idempotency is required. Facts may arrive more than once due to:
- Producer retry with the same
message_id - Boundary replay after reconnection
- Consumer restart and recovery from last confirmed offset
Idempotency Strategy 1: Unique Message ID
Store a record of processed message_id values. Before processing, check if already processed:
if (already_processed(fact.envelope.message_id)) {
// Idempotent retry — skip processing
return SUCCESS
}
// Process fact
result = apply_business_logic(fact)
persist_outcome(result)
// Record that this message_id was processed
mark_as_processed(fact.envelope.message_id)
The deduplication table can use a TTL equal to the boundary's fact retention (default 7 days).
Idempotency Strategy 2: Natural Key
If the business logic has a natural key (entity ID + operation), upsert instead of insert:
// Idempotent upsert using the fact's business key
entity_key = fact.subject + "_" + fact.predicate
business_key = extract_key_from(fact.object_json)
result = upsert(entity_key, business_key, fact.object_json)
// First call creates; subsequent calls update-or-skip
persist_outcome(result)
Idempotency Strategy 3: Outcome Idempotency
If the processing outcome is idempotent by nature (e.g., setting a field to a specific value), re-processing the same fact produces the same result:
// Processing is idempotent by design
outcome = execute_idempotent_operation(fact)
persist_outcome(outcome)
// If called again with the same fact, outcome is identical
Cursor Persistence and Recovery
The boundary maintains a durable named consumer. The consumer cursor (confirmed offset) is persisted and survives restarts.
On Consumer Restart
If the consumer crashes mid-batch:
- On restart, call
FetchFactswith the same consumer name - The boundary returns facts from
last_confirmed_offset + 1 - Some facts in the new batch may have been partially processed before the crash
- Re-process them (idempotency handles this)
Confirming Only After Durability
Danger: If you confirm receipt before persisting the outcome, a crash between confirmation and persistence loses the outcome.
// ❌ WRONG ORDER:
ConfirmReceipt(...) // cursor advances
persist_outcome(...) // crashes before this — outcome lost!
// ✅ CORRECT ORDER:
persist_outcome(...) // durable
ConfirmReceipt(...) // cursor advances only after durable persist
Pull-Driven Fetch Model
Consumers pull facts from the boundary, not the other way around. This means:
- Consumers control fetch pace. They can batch, throttle, or pause as needed without affecting the boundary.
- Consumers are responsible for maintaining cursor state. The boundary does not push facts to known consumers.
- Fetch latency depends on consumer health and processing speed, not on the boundary.
Polling pattern:
while (true) {
facts = FetchFacts(consumer_name, limit)
if (facts.length == 0) {
// No new facts — wait before retrying
sleep(100ms)
continue
}
// Process batch
for fact in facts:
// ... process idempotently ...
// Confirm
ConfirmReceipt(consumer_name, facts.last.offset)
}
Handling Artifacts
If a fact references an artifact:
Extract Reference Metadata
artifact_ref = fact.object_json // or extract specific fields
bucket = artifact_ref["bucket"]
key = artifact_ref["key"]
digest = artifact_ref["digest"]
media_type = artifact_ref["media_type"]
Retrieve Artifact
artifact_blob = GetObject(bucket, key)
Verify Integrity (Optional)
computed_digest = SHA256(artifact_blob)
if (computed_digest != digest) {
throw IntegrityError("Artifact digest mismatch")
}
Retry on Unavailability
If GetObject fails with "not found," retry before treating it as permanent:
max_retries = 10
backoff = exponential(base=100ms)
retry_count = 0
while (retry_count < max_retries) {
try:
artifact_blob = GetObject(bucket, key)
// Success — process artifact
break
except NotFound:
retry_count++
if (retry_count == max_retries) {
throw ArtifactMissingError("Artifact not found after retries")
}
wait(backoff)
backoff *= 2
}
Handling Delayed or Replayed Facts
Facts may arrive after a long delay (hours or days) due to:
- Network partition and boundary replay
- Producer batching or delayed emission
- Consumer restart and recovery
Treat delayed facts the same as fresh facts. Process them idempotently. Do not reject them based on arrival time.
Performance Characteristics
Under normal operation:
FetchFactslatency: 1–50ms- Processing latency per fact: application-dependent (100ms–seconds)
ConfirmReceiptlatency: 1–10ms
Batch size tuning:
- Large batches (1000+): better throughput, higher memory, longer processing time per batch
- Small batches (1–10): lower latency, more RPC calls, higher overhead
- Typical: 100–500 facts per batch
Error Handling
Processing Error (Business Logic Fails)
If apply_business_logic throws an error:
- Log the error with the fact ID and offset
- Do NOT confirm receipt
- Wait for the next fetch (batch is incomplete)
- On next fetch, the boundary returns from the last confirmed offset — retry the failing fact
- Consider a dead-letter queue for facts that fail repeatedly
Persistence Error (Database Fails)
If persist_outcome throws an error:
- Do NOT confirm receipt
- Wait for recovery or manual intervention
- Once the database is healthy, call
FetchFactsagain — retry the batch
Artifact Unavailable
If GetObject fails with a transient error (timeout, temporary unavailability):
- Retry with exponential backoff (up to 10 retries)
- If it remains unavailable after retries, treat it as a processing error and wait for recovery