Docs/Packages/Analytics

    Analytics

    @decoperations/s3worm-analytics is an optional middleware layer that records a UsageEvent for every PUT / GET / HEAD / LIST / DELETE / COPY call made through an S3Worm instance, broken down by tenant and per-feature action. Use it for usage dashboards, per-tenant quotas / billing, cost attribution per feature, and detection of egress hotspots.

    Install

    pnpm add @decoperations/s3worm-analytics
    

    Quick start

    import { S3Worm } from "@decoperations/s3worm";
    import { withAnalytics, ConsoleSink } from "@decoperations/s3worm-analytics";
    
    const worm = withAnalytics(new S3Worm({ bucket: "my-bucket", /* … */ }), {
      sink: new ConsoleSink(),
      tenant: () => "user-123",
      action: () => "files.upload",
    });
    
    await worm.create("docs/hello.json", { hi: true });
    // [s3worm] PUT    docs/hello.json in=12B out=0B 8ms ok tenant=user-123 action=files.upload
    

    withAnalytics mutates the S3Worm's underlying transport in place. It returns the same instance for fluent use.

    Strictly-typed alternative

    When you control the construction site, wrap the transport directly:

    import { S3Worm, S3Client } from "@decoperations/s3worm";
    import { wrapTransport, ConsoleSink } from "@decoperations/s3worm-analytics";
    
    const transport = wrapTransport(new S3Client(config), {
      sink: new ConsoleSink(),
    });
    
    const worm = new S3Worm({ ...config, transport });
    

    UsageEvent

    interface UsageEvent {
      op: "PUT" | "GET" | "HEAD" | "LIST" | "DELETE" | "COPY";
      bucket: string;
      key: string;
      bytesIn: number;        // PUT body size
      bytesOut: number;       // GET ContentLength, sum of LIST sizes, HEAD size
      durationMs: number;
      tenant?: string;
      action?: string;
      ok: boolean;
      errorCode?: string;
      timestamp: string;      // ISO-8601
      extra?: Record<string, unknown>;  // per-call dimensions (workflowId, runId, …)
      sampled?: boolean;      // populated when AnalyticsOptions.sample is on
      sampleWeight?: number;  // 1 / rate — for downstream extrapolation
      source?: "transport" | "cdn" | "public-url" | string;  // event origin
      schemaVersion?: string; // emitted when includeVersion is true
    }
    
    FieldNotes
    bytesInUTF-8 byte length for string payloads, byteLength for binary.
    bytesOutGET returns the size of the response body; LIST sums per-entry size; HEAD records the object size without fetching the body.
    okfalse when the underlying transport call threw. Event is still emitted; the error is re-thrown after emission.
    errorCodeResolved from err.code, err.Code, then err.name.
    extraNon-reserved keys from AnalyticsContext flow through here. Use for workflowId, runId, feature, requestId, …
    sourceDefaults to "transport" for SDK calls. "public-url" for estimateEgressOnPublicUrl events. "cdn" for recordExternalEgress reconciliation.

    By default getPublicUrl performs no I/O and emits no event. Set estimateEgressOnPublicUrl: true to opt into recording an estimated-egress event on issuance.


    Per-request context

    runWithContext uses AsyncLocalStorage so HTTP handlers can tag a request with { tenant, action } once and have every downstream S3WORM call inherit those tags automatically — no need to thread arguments through every call site.

    import { runWithContext } from "@decoperations/s3worm-analytics";
    
    app.use((req, _res, next) =>
      runWithContext(
        { tenant: req.session.address, action: req.routeName },
        next
      )
    );
    

    When tenant / action resolvers are not passed to withAnalytics, the wrapper falls back to the ambient runWithContext value. Explicit resolvers win over ambient context.

    Edge runtimes: node:async_hooks is unavailable on some edge platforms. The package falls back to a process-global ref-cell, which is not safe across concurrent requests. The first call to runWithContext from a fallback runtime emits a console.warn so the problem surfaces in logs; pass explicit resolvers in AnalyticsOptions (or use attributeFromKey) when running on edge.

    Recover tenant from the storage key

    For background workers / cron jobs where no HTTP context exists, recover tenant from the key shape:

    withAnalytics(client, {
      sink,
      attributeFromKey: (key) => {
        const m = /^users\/([^/]+)\//.exec(key);
        return m ? { tenant: m[1] } : undefined;
      },
    });
    

    attributeFromKey runs only when ambient context + explicit resolvers come up empty.

    Custom per-call dimensions

    Two ways to attach workflowId / runId / feature / … to events:

    // (a) via ambient context — all non-reserved keys flow into event.extra
    runWithContext({ tenant, action, workflowId, runId, feature }, () => /* … */);
    
    // (b) via an enrich() hook — runs after buildEvent, can mutate or replace
    withAnalytics(client, {
      sink,
      enrich: (event) => ({
        ...event,
        extra: { ...(event.extra ?? {}), region: process.env.REGION },
      }),
    });
    

    Built-in sinks

    All sinks are exported from @decoperations/s3worm-analytics. The /sinks subpath exists for tree-shaking-conscious consumers.

    SinkUse case
    ConsoleSinkDev / debug. json: true mode emits machine-readable lines.
    MemorySinkTests. .events array + .byOp() helper.
    MultiSinkFan-out to several sinks at once. Per-sink failures are isolated.
    BufferedHttpSinkPOST batched JSON { events: UsageEvent[] } to your ingest endpoint. Supports retry-with-backoff and pending().
    S3JsonlSinkEat-your-own-dogfood: write JSONL audit logs back to an S3 bucket. Optional tenantInPath shards by tenant.
    OtelSinkOpenTelemetry — emits Counters for requests / bytes_in / bytes_out and a Histogram for duration_ms. Requires @opentelemetry/api.
    SentrySinkAdds a Sentry breadcrumb per event and tags the active scope with s3worm.tenant / s3worm.action for crash attribution. Requires a Sentry SDK.

    BufferedHttpSink

    Flushes when either batchSize events are buffered or flushIntervalMs elapses. Always safe to call flush() manually before process exit. The sink implements Symbol.asyncDispose so it works under await using sink = new BufferedHttpSink(…).

    import { BufferedHttpSink } from "@decoperations/s3worm-analytics";
    
    const sink = new BufferedHttpSink({
      url: "https://ingest.example.com/usage",
      batchSize: 100,
      flushIntervalMs: 10_000,
      headers: { authorization: "Bearer …" },
      retry: { attempts: 3, initialDelayMs: 250, factor: 2, maxDelayMs: 5_000 },
    });
    
    console.log(`pending: ${sink.pending()}`);  // process-shutdown drain logic
    

    retry opts in to exponential backoff on transient 5xx / 408 / 429 responses and network errors. Non-retryable status codes (4xx other than 408 / 429) fail immediately.

    S3JsonlSink

    Writes per-flush JSONL objects under a templated key (default prefix/YYYY/MM/DD/HH/<ts>-<rand>.jsonl). Pass a bare (non-instrumented) transport so the writes themselves aren't recorded as events — otherwise you'll create a feedback loop.

    import { S3Client } from "@decoperations/s3worm";
    import { S3JsonlSink } from "@decoperations/s3worm-analytics";
    
    const auditTransport = new S3Client({ bucket: "my-audit-bucket", /* … */ });
    
    new S3JsonlSink({
      transport: auditTransport,
      prefix: "_analytics",
      batchSize: 500,
      flushIntervalMs: 30_000,
      tenantInPath: true,  // → _analytics/<tenant>/YYYY/MM/DD/HH/<ts>-<rand>.jsonl
    });
    

    OtelSink

    Emits one Counter (s3worm.requests), two Counters (s3worm.bytes_in, s3worm.bytes_out), and one Histogram (s3worm.duration_ms) per event — all tagged with op / bucket / tenant / action / ok. @opentelemetry/api is an optional peer dependency.

    import { metrics } from "@opentelemetry/api";
    import { OtelSink } from "@decoperations/s3worm-analytics";
    
    const sink = new OtelSink({
      meter: metrics.getMeter("my-service"),
      metricsPrefix: "s3worm",
      histogramBuckets: { durationMs: [5, 10, 50, 100, 500, 1_000, 5_000] },
    });
    

    SentrySink

    Adds a Sentry breadcrumb per event and tags the active scope with s3worm.tenant / s3worm.action. @sentry/node (or any Sentry SDK with addBreadcrumb / withScope) is an optional peer dependency.

    import * as Sentry from "@sentry/node";
    import { SentrySink } from "@decoperations/s3worm-analytics";
    
    new SentrySink({ client: Sentry, category: "s3worm", tagScope: true });
    

    Custom sink

    Implement AnalyticsSink:

    import type { AnalyticsSink, UsageEvent } from "@decoperations/s3worm-analytics";
    
    class StatsdSink implements AnalyticsSink {
      record(e: UsageEvent) {
        statsd.increment(`s3.${e.op.toLowerCase()}`, 1, [`tenant:${e.tenant}`]);
        statsd.histogram(`s3.bytes_out`, e.bytesOut, [`action:${e.action}`]);
      }
    }
    

    record MUST NOT throw. Sinks should swallow their own errors so that instrumentation failures never break the underlying S3 operation. If a sink does throw (or its returned Promise rejects), the wrapper catches the error and routes it to the onError callback in AnalyticsOptions (default: console.warn).


    API reference

    withAnalytics(client, options)

    Wrap an existing S3Worm instance. Mutates and returns it.

    wrapTransport(transport, options)

    Return a new Transport that decorates transport. The original is untouched.

    AnalyticsOptions

    FieldTypeNotes
    sinkAnalyticsSink | AnalyticsSink[]Required. Multiple sinks fan out.
    tenant() => string | undefinedOptional. Falls back to getContext().tenant, then attributeFromKey.
    action() => string | undefinedOptional. Falls back to getContext().action, then attributeFromKey.
    bucketstring | () => string | undefinedOptional override of the bucket reported on events. Resolver form is called per call.
    onError(err, event) => voidOptional. Defaults to console.warn.
    before(preEvent) => void | Promise<void>Optional pre-call hook. Throw to short-circuit (quota / circuit-breaker). A { ok: false } event is still recorded.
    enrich(event) => UsageEvent | voidOptional. Runs after buildEvent. Use to attach per-call dimensions.
    attributeFromKey(key, op) => { tenant?, action?, extra? }Recover dimensions from the storage key when ambient context + resolvers come up empty.
    samplenumber | (event) => numberSampling rate in [0, 1]. Kept events carry sampled: true and sampleWeight = 1 / rate. Errors are always kept.
    includeVersionbooleanAttach schemaVersion to every event.
    estimateEgressOnPublicUrlbooleanEmit a source: "public-url" event when getPublicUrl() is called.
    estimateBytes(key) => number | undefinedCaller-supplied size for the public-URL egress event.

    recordExternalEgress(event)

    AnalyticsTransport (returned by wrapTransport, or accessible via getAnalyticsTransport(worm) after withAnalytics) exposes a public recordExternalEgress method so you can feed CloudFront / R2 / imgix access-log entries through the same sinks. Defaults source to "cdn".

    import { wrapTransport, getAnalyticsTransport } from "@decoperations/s3worm-analytics";
    
    const transport = wrapTransport(inner, { sink });
    transport.recordExternalEgress({
      op: "GET",
      bucket: "primary",
      key: "users/abc/avatar.png",
      bytesIn: 0,
      bytesOut: 12_345,
      durationMs: 0,
      ok: true,
      tenant: "abc",
      action: "cdn.serve",
      timestamp: new Date().toISOString(),
    });
    

    runWithContext(ctx, fn) / getContext() / withContext(patch, fn)

    AsyncLocalStorage-backed helpers for ambient context propagation.


    Failure semantics

    • Sink errors (sync throw or rejected Promise) are caught and routed to onError. Instrumentation never breaks the underlying operation.
    • Transport errors are re-thrown after a { ok: false, errorCode } event is recorded.
    • getPublicUrl does no I/O and emits no event.