Analytics
@decoperations/s3worm-analytics is an optional middleware layer that records a UsageEvent for every PUT / GET / HEAD / LIST / DELETE / COPY call made through an S3Worm instance, broken down by tenant and per-feature action. Use it for usage dashboards, per-tenant quotas / billing, cost attribution per feature, and detection of egress hotspots.
Install
pnpm add @decoperations/s3worm-analytics
Quick start
import { S3Worm } from "@decoperations/s3worm";
import { withAnalytics, ConsoleSink } from "@decoperations/s3worm-analytics";
const worm = withAnalytics(new S3Worm({ bucket: "my-bucket", /* … */ }), {
sink: new ConsoleSink(),
tenant: () => "user-123",
action: () => "files.upload",
});
await worm.create("docs/hello.json", { hi: true });
// [s3worm] PUT docs/hello.json in=12B out=0B 8ms ok tenant=user-123 action=files.upload
withAnalytics mutates the S3Worm's underlying transport in place. It returns the same instance for fluent use.
Strictly-typed alternative
When you control the construction site, wrap the transport directly:
import { S3Worm, S3Client } from "@decoperations/s3worm";
import { wrapTransport, ConsoleSink } from "@decoperations/s3worm-analytics";
const transport = wrapTransport(new S3Client(config), {
sink: new ConsoleSink(),
});
const worm = new S3Worm({ ...config, transport });
UsageEvent
interface UsageEvent {
op: "PUT" | "GET" | "HEAD" | "LIST" | "DELETE" | "COPY";
bucket: string;
key: string;
bytesIn: number; // PUT body size
bytesOut: number; // GET ContentLength, sum of LIST sizes, HEAD size
durationMs: number;
tenant?: string;
action?: string;
ok: boolean;
errorCode?: string;
timestamp: string; // ISO-8601
extra?: Record<string, unknown>; // per-call dimensions (workflowId, runId, …)
sampled?: boolean; // populated when AnalyticsOptions.sample is on
sampleWeight?: number; // 1 / rate — for downstream extrapolation
source?: "transport" | "cdn" | "public-url" | string; // event origin
schemaVersion?: string; // emitted when includeVersion is true
}
| Field | Notes |
|---|---|
bytesIn | UTF-8 byte length for string payloads, byteLength for binary. |
bytesOut | GET returns the size of the response body; LIST sums per-entry size; HEAD records the object size without fetching the body. |
ok | false when the underlying transport call threw. Event is still emitted; the error is re-thrown after emission. |
errorCode | Resolved from err.code, err.Code, then err.name. |
extra | Non-reserved keys from AnalyticsContext flow through here. Use for workflowId, runId, feature, requestId, … |
source | Defaults to "transport" for SDK calls. "public-url" for estimateEgressOnPublicUrl events. "cdn" for recordExternalEgress reconciliation. |
By default getPublicUrl performs no I/O and emits no event. Set estimateEgressOnPublicUrl: true to opt into recording an estimated-egress event on issuance.
Per-request context
runWithContext uses AsyncLocalStorage so HTTP handlers can tag a request with { tenant, action } once and have every downstream S3WORM call inherit those tags automatically — no need to thread arguments through every call site.
import { runWithContext } from "@decoperations/s3worm-analytics";
app.use((req, _res, next) =>
runWithContext(
{ tenant: req.session.address, action: req.routeName },
next
)
);
When tenant / action resolvers are not passed to withAnalytics, the wrapper falls back to the ambient runWithContext value. Explicit resolvers win over ambient context.
Edge runtimes:
node:async_hooksis unavailable on some edge platforms. The package falls back to a process-global ref-cell, which is not safe across concurrent requests. The first call torunWithContextfrom a fallback runtime emits aconsole.warnso the problem surfaces in logs; pass explicit resolvers inAnalyticsOptions(or useattributeFromKey) when running on edge.
Recover tenant from the storage key
For background workers / cron jobs where no HTTP context exists, recover tenant from the key shape:
withAnalytics(client, {
sink,
attributeFromKey: (key) => {
const m = /^users\/([^/]+)\//.exec(key);
return m ? { tenant: m[1] } : undefined;
},
});
attributeFromKey runs only when ambient context + explicit resolvers come up empty.
Custom per-call dimensions
Two ways to attach workflowId / runId / feature / … to events:
// (a) via ambient context — all non-reserved keys flow into event.extra
runWithContext({ tenant, action, workflowId, runId, feature }, () => /* … */);
// (b) via an enrich() hook — runs after buildEvent, can mutate or replace
withAnalytics(client, {
sink,
enrich: (event) => ({
...event,
extra: { ...(event.extra ?? {}), region: process.env.REGION },
}),
});
Built-in sinks
All sinks are exported from @decoperations/s3worm-analytics. The /sinks subpath exists for tree-shaking-conscious consumers.
| Sink | Use case |
|---|---|
ConsoleSink | Dev / debug. json: true mode emits machine-readable lines. |
MemorySink | Tests. .events array + .byOp() helper. |
MultiSink | Fan-out to several sinks at once. Per-sink failures are isolated. |
BufferedHttpSink | POST batched JSON { events: UsageEvent[] } to your ingest endpoint. Supports retry-with-backoff and pending(). |
S3JsonlSink | Eat-your-own-dogfood: write JSONL audit logs back to an S3 bucket. Optional tenantInPath shards by tenant. |
OtelSink | OpenTelemetry — emits Counters for requests / bytes_in / bytes_out and a Histogram for duration_ms. Requires @opentelemetry/api. |
SentrySink | Adds a Sentry breadcrumb per event and tags the active scope with s3worm.tenant / s3worm.action for crash attribution. Requires a Sentry SDK. |
BufferedHttpSink
Flushes when either batchSize events are buffered or flushIntervalMs elapses. Always safe to call flush() manually before process exit. The sink implements Symbol.asyncDispose so it works under await using sink = new BufferedHttpSink(…).
import { BufferedHttpSink } from "@decoperations/s3worm-analytics";
const sink = new BufferedHttpSink({
url: "https://ingest.example.com/usage",
batchSize: 100,
flushIntervalMs: 10_000,
headers: { authorization: "Bearer …" },
retry: { attempts: 3, initialDelayMs: 250, factor: 2, maxDelayMs: 5_000 },
});
console.log(`pending: ${sink.pending()}`); // process-shutdown drain logic
retry opts in to exponential backoff on transient 5xx / 408 / 429 responses and network errors. Non-retryable status codes (4xx other than 408 / 429) fail immediately.
S3JsonlSink
Writes per-flush JSONL objects under a templated key (default prefix/YYYY/MM/DD/HH/<ts>-<rand>.jsonl). Pass a bare (non-instrumented) transport so the writes themselves aren't recorded as events — otherwise you'll create a feedback loop.
import { S3Client } from "@decoperations/s3worm";
import { S3JsonlSink } from "@decoperations/s3worm-analytics";
const auditTransport = new S3Client({ bucket: "my-audit-bucket", /* … */ });
new S3JsonlSink({
transport: auditTransport,
prefix: "_analytics",
batchSize: 500,
flushIntervalMs: 30_000,
tenantInPath: true, // → _analytics/<tenant>/YYYY/MM/DD/HH/<ts>-<rand>.jsonl
});
OtelSink
Emits one Counter (s3worm.requests), two Counters (s3worm.bytes_in, s3worm.bytes_out), and one Histogram (s3worm.duration_ms) per event — all tagged with op / bucket / tenant / action / ok. @opentelemetry/api is an optional peer dependency.
import { metrics } from "@opentelemetry/api";
import { OtelSink } from "@decoperations/s3worm-analytics";
const sink = new OtelSink({
meter: metrics.getMeter("my-service"),
metricsPrefix: "s3worm",
histogramBuckets: { durationMs: [5, 10, 50, 100, 500, 1_000, 5_000] },
});
SentrySink
Adds a Sentry breadcrumb per event and tags the active scope with s3worm.tenant / s3worm.action. @sentry/node (or any Sentry SDK with addBreadcrumb / withScope) is an optional peer dependency.
import * as Sentry from "@sentry/node";
import { SentrySink } from "@decoperations/s3worm-analytics";
new SentrySink({ client: Sentry, category: "s3worm", tagScope: true });
Custom sink
Implement AnalyticsSink:
import type { AnalyticsSink, UsageEvent } from "@decoperations/s3worm-analytics";
class StatsdSink implements AnalyticsSink {
record(e: UsageEvent) {
statsd.increment(`s3.${e.op.toLowerCase()}`, 1, [`tenant:${e.tenant}`]);
statsd.histogram(`s3.bytes_out`, e.bytesOut, [`action:${e.action}`]);
}
}
record MUST NOT throw. Sinks should swallow their own errors so that instrumentation failures never break the underlying S3 operation. If a sink does throw (or its returned Promise rejects), the wrapper catches the error and routes it to the onError callback in AnalyticsOptions (default: console.warn).
API reference
withAnalytics(client, options)
Wrap an existing S3Worm instance. Mutates and returns it.
wrapTransport(transport, options)
Return a new Transport that decorates transport. The original is untouched.
AnalyticsOptions
| Field | Type | Notes |
|---|---|---|
sink | AnalyticsSink | AnalyticsSink[] | Required. Multiple sinks fan out. |
tenant | () => string | undefined | Optional. Falls back to getContext().tenant, then attributeFromKey. |
action | () => string | undefined | Optional. Falls back to getContext().action, then attributeFromKey. |
bucket | string | () => string | undefined | Optional override of the bucket reported on events. Resolver form is called per call. |
onError | (err, event) => void | Optional. Defaults to console.warn. |
before | (preEvent) => void | Promise<void> | Optional pre-call hook. Throw to short-circuit (quota / circuit-breaker). A { ok: false } event is still recorded. |
enrich | (event) => UsageEvent | void | Optional. Runs after buildEvent. Use to attach per-call dimensions. |
attributeFromKey | (key, op) => { tenant?, action?, extra? } | Recover dimensions from the storage key when ambient context + resolvers come up empty. |
sample | number | (event) => number | Sampling rate in [0, 1]. Kept events carry sampled: true and sampleWeight = 1 / rate. Errors are always kept. |
includeVersion | boolean | Attach schemaVersion to every event. |
estimateEgressOnPublicUrl | boolean | Emit a source: "public-url" event when getPublicUrl() is called. |
estimateBytes | (key) => number | undefined | Caller-supplied size for the public-URL egress event. |
recordExternalEgress(event)
AnalyticsTransport (returned by wrapTransport, or accessible via getAnalyticsTransport(worm) after withAnalytics) exposes a public recordExternalEgress method so you can feed CloudFront / R2 / imgix access-log entries through the same sinks. Defaults source to "cdn".
import { wrapTransport, getAnalyticsTransport } from "@decoperations/s3worm-analytics";
const transport = wrapTransport(inner, { sink });
transport.recordExternalEgress({
op: "GET",
bucket: "primary",
key: "users/abc/avatar.png",
bytesIn: 0,
bytesOut: 12_345,
durationMs: 0,
ok: true,
tenant: "abc",
action: "cdn.serve",
timestamp: new Date().toISOString(),
});
runWithContext(ctx, fn) / getContext() / withContext(patch, fn)
AsyncLocalStorage-backed helpers for ambient context propagation.
Failure semantics
- Sink errors (sync throw or rejected
Promise) are caught and routed toonError. Instrumentation never breaks the underlying operation. - Transport errors are re-thrown after a
{ ok: false, errorCode }event is recorded. getPublicUrldoes no I/O and emits no event.