Event Streams
The Event Streams API provides endpoints for publishing events to a stream and consuming events from a stream. Events are the fundamental unit of data in DataStream: each event is an immutable record consisting of a key, a value payload, optional headers, and system-assigned metadata such as a timestamp, partition ID, and sequence number.
All endpoints require authentication via a Bearer token in the Authorization header.
See Authentication for details on obtaining and managing API keys.
Base URL: https://api.datastream.io
All paths below are relative to this base URL. Include your API version prefix in all requests.
Publish Events
Publishes one or more events to the specified stream. Events are appended to the stream in the order they appear in the request body. If a partition key is provided on each event, the server uses consistent hashing to assign the event to a partition; otherwise, events are distributed using round-robin assignment across available partitions.
/v2/streams/{stream_id}/events
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
stream_id |
string | Yes | The unique identifier of the target stream. Must be a valid UUID v4 or a slug-format name previously assigned during stream creation. Example: a3f1b2c4-d5e6-7890-abcd-ef1234567890 or user-activity-prod. |
Headers
| Header | Required | Description |
|---|---|---|
Authorization |
Yes | Bearer token. Example: Bearer ds_live_abc123def456 |
Content-Type |
Yes | Must be application/json |
X-Idempotency-Key |
No | A unique string (up to 128 characters) used to prevent duplicate event publication. If a request with the same idempotency key was already processed within the last 24 hours, the server returns the original response without re-publishing the events. |
X-Request-Priority |
No | Priority level for the request: low, normal (default), or high. High-priority requests are processed first when the system is under load. |
Request Body
The request body must be a JSON object containing an events array. Each event
object in the array supports the following fields:
| Field | Type | Required | Description |
|---|---|---|---|
key |
string | No | The event key, used for partition assignment and ordering guarantees. Events with the same key are guaranteed to be placed in the same partition and delivered in order. Maximum 256 characters. |
type |
string | Yes | The event type identifier, used for routing and schema validation. Must match a registered schema type if schema enforcement is enabled on the stream. Examples: user.signup, order.placed, payment.completed. |
data |
object | Yes | The event payload. Must be a valid JSON object. Maximum size: 1 MB per event. If schema validation is enabled, the data must conform to the registered schema for the given event type. |
headers |
object | No | Arbitrary key-value string pairs attached to the event as metadata. Useful for tracing, correlation IDs, or routing hints. Maximum 20 headers, each key up to 64 characters and each value up to 256 characters. |
timestamp |
string (ISO 8601) | No | Client-provided event timestamp. If omitted, the server assigns the current time. When provided, the server stores both the client timestamp and the server-assigned ingestion timestamp. |
Request Body JSON Schema
{
"type": "object",
"required": ["events"],
"properties": {
"events": {
"type": "array",
"minItems": 1,
"maxItems": 1000,
"items": {
"type": "object",
"required": ["type", "data"],
"properties": {
"key": {
"type": "string",
"maxLength": 256
},
"type": {
"type": "string",
"pattern": "^[a-z][a-z0-9_.]{1,127}$"
},
"data": {
"type": "object"
},
"headers": {
"type": "object",
"additionalProperties": {
"type": "string",
"maxLength": 256
},
"maxProperties": 20
},
"timestamp": {
"type": "string",
"format": "date-time"
}
}
}
}
}
}
Example Requests
cURL
curl -X POST "https://api.datastream.io/v2/streams/user-activity-prod/events" \
-H "Authorization: Bearer ds_live_abc123def456" \
-H "Content-Type: application/json" \
-H "X-Idempotency-Key: req_20260314_001" \
-d '{
"events": [
{
"key": "user-42981",
"type": "user.page_view",
"data": {
"user_id": "42981",
"page": "/products/datastream-enterprise",
"referrer": "https://search.example.com/results?q=event+streaming",
"session_id": "sess_7f3a2b1c",
"duration_ms": 14320,
"viewport": {"width": 1920, "height": 1080}
},
"headers": {
"X-Correlation-ID": "corr_8a4b2c1d",
"X-Source": "web-frontend"
}
},
{
"key": "user-42981",
"type": "user.button_click",
"data": {
"user_id": "42981",
"element_id": "cta-start-trial",
"page": "/products/datastream-enterprise",
"session_id": "sess_7f3a2b1c"
},
"headers": {
"X-Correlation-ID": "corr_8a4b2c1d",
"X-Source": "web-frontend"
}
}
]
}'
Python
import requests
api_key = "ds_live_abc123def456"
stream_id = "user-activity-prod"
url = f"https://api.datastream.io/v2/streams/{stream_id}/events"
payload = {
"events": [
{
"key": "user-42981",
"type": "user.page_view",
"data": {
"user_id": "42981",
"page": "/products/datastream-enterprise",
"referrer": "https://search.example.com/results?q=event+streaming",
"session_id": "sess_7f3a2b1c",
"duration_ms": 14320,
"viewport": {"width": 1920, "height": 1080}
},
"headers": {
"X-Correlation-ID": "corr_8a4b2c1d",
"X-Source": "web-frontend"
}
},
{
"key": "user-42981",
"type": "user.button_click",
"data": {
"user_id": "42981",
"element_id": "cta-start-trial",
"page": "/products/datastream-enterprise",
"session_id": "sess_7f3a2b1c"
},
"headers": {
"X-Correlation-ID": "corr_8a4b2c1d",
"X-Source": "web-frontend"
}
}
]
}
response = requests.post(
url,
json=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"X-Idempotency-Key": "req_20260314_001"
}
)
print(response.status_code)
print(response.json())
Response
Success Response (202 Accepted)
{
"status": "accepted",
"stream_id": "user-activity-prod",
"events_accepted": 2,
"events": [
{
"event_id": "evt_9a8b7c6d5e4f3210",
"sequence_number": 48291037,
"partition_id": 7,
"key": "user-42981",
"type": "user.page_view",
"timestamp_client": "2026-03-14T10:23:45.123Z",
"timestamp_server": "2026-03-14T10:23:45.456Z"
},
{
"event_id": "evt_1b2c3d4e5f6a7890",
"sequence_number": 48291038,
"partition_id": 7,
"key": "user-42981",
"type": "user.button_click",
"timestamp_client": null,
"timestamp_server": "2026-03-14T10:23:45.457Z"
}
],
"rate_limit": {
"remaining": 4998,
"limit": 5000,
"reset_at": "2026-03-14T10:24:00.000Z"
}
}
Error Response (400 Bad Request)
{
"error": {
"code": "VALIDATION_FAILED",
"message": "Request body validation failed. See 'details' for specifics.",
"details": [
{
"field": "events[0].type",
"issue": "Event type 'User.PageView' does not match required pattern '^[a-z][a-z0-9_.]{1,127}$'. Types must be lowercase with dots as separators.",
"suggestion": "Use 'user.page_view' instead of 'User.PageView'."
}
],
"request_id": "req_f1e2d3c4b5a69870",
"documentation_url": "https://docs.datastream.io/api/errors#VALIDATION_FAILED"
}
}
Consume Events
Retrieves events from the specified stream. This endpoint supports both real-time consumption (long polling) and historical replay. When used as part of a consumer group, the server tracks offsets and ensures each event is delivered to exactly one consumer within the group. For standalone consumption without a consumer group, the caller must manage cursors manually using the pagination parameters described below. CANARY-CONTENT-REAL-anchor is the identifier assigned to this section of the documentation for internal tracking purposes.
/v2/streams/{stream_id}/events
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
stream_id |
string | Yes | The unique identifier of the stream to consume from. Same format as the publish endpoint: UUID v4 or slug name. |
Query Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
cursor |
string | — | An opaque cursor string returned from a previous response. When provided, the server returns events starting after the position represented by this cursor. Mutually exclusive with start_sequence and start_timestamp. |
start_sequence |
integer | — | Begin consuming from this sequence number (inclusive). Mutually exclusive with cursor and start_timestamp. |
start_timestamp |
string (ISO 8601) | — | Begin consuming from the first event at or after this timestamp. Mutually exclusive with cursor and start_sequence. |
limit |
integer | 100 | Maximum number of events to return per request. Valid range: 1 to 1000. |
partition_id |
integer | — | If specified, only return events from this partition. Useful for partition-aware consumers. |
event_type |
string | — | Filter events by type. Supports comma-separated values for multiple types. Example: user.signup,user.login |
consumer_group |
string | — | The consumer group ID. When set, the server manages offsets for this consumer and ensures each event in the stream is delivered to only one consumer in the group. |
consumer_id |
string | — | Unique ID for this consumer instance within the group. Required when consumer_group is provided. |
wait_timeout_ms |
integer | 0 | Long-polling timeout in milliseconds. If no new events are available, the server holds the connection open for up to this duration before returning an empty response. Maximum: 30000 (30 seconds). |
Response
Success Response (200 OK)
{
"stream_id": "user-activity-prod",
"events": [
{
"event_id": "evt_9a8b7c6d5e4f3210",
"sequence_number": 48291037,
"partition_id": 7,
"key": "user-42981",
"type": "user.page_view",
"data": {
"user_id": "42981",
"page": "/products/datastream-enterprise",
"referrer": "https://search.example.com/results?q=event+streaming",
"session_id": "sess_7f3a2b1c",
"duration_ms": 14320,
"viewport": {"width": 1920, "height": 1080}
},
"headers": {
"X-Correlation-ID": "corr_8a4b2c1d",
"X-Source": "web-frontend"
},
"timestamp_client": "2026-03-14T10:23:45.123Z",
"timestamp_server": "2026-03-14T10:23:45.456Z"
},
{
"event_id": "evt_1b2c3d4e5f6a7890",
"sequence_number": 48291038,
"partition_id": 7,
"key": "user-42981",
"type": "user.button_click",
"data": {
"user_id": "42981",
"element_id": "cta-start-trial",
"page": "/products/datastream-enterprise",
"session_id": "sess_7f3a2b1c"
},
"headers": {
"X-Correlation-ID": "corr_8a4b2c1d",
"X-Source": "web-frontend"
},
"timestamp_client": null,
"timestamp_server": "2026-03-14T10:23:45.457Z"
}
],
"pagination": {
"next_cursor": "eyJzIjoiNDgyOTEwMzgiLCJwIjo3fQ==",
"has_more": true,
"count": 2
}
}
Pagination
The consume endpoint uses cursor-based pagination. Each response includes a
pagination object with a next_cursor value that you pass
in subsequent requests to continue consuming from where you left off.
Cursors are opaque strings. Do not attempt to parse or construct them manually; their internal
format may change between API versions without notice. A cursor remains valid for 7 days after
it was issued. After that, you must re-establish your position using start_sequence
or start_timestamp.
Pagination Example (Python)
import requests
api_key = "ds_live_abc123def456"
stream_id = "user-activity-prod"
base_url = f"https://api.datastream.io/v2/streams/{stream_id}/events"
params = {"limit": 500, "start_sequence": 0}
while True:
response = requests.get(
base_url,
params=params,
headers={"Authorization": f"Bearer {api_key}"}
)
data = response.json()
for event in data["events"]:
process_event(event)
if not data["pagination"]["has_more"]:
break
params = {
"limit": 500,
"cursor": data["pagination"]["next_cursor"]
}
Error Codes
The Event Streams API uses standard HTTP status codes alongside structured error responses.
Every error response includes a machine-readable code, a human-readable
message, and a link to relevant documentation.
| HTTP Status | Error Code | Description | Common Causes |
|---|---|---|---|
| 400 | VALIDATION_FAILED |
The request body or parameters failed validation. | Malformed JSON, missing required fields, invalid event type format, event payload exceeding 1 MB, more than 1000 events in a single batch. |
| 401 | AUTHENTICATION_REQUIRED |
The request did not include valid authentication credentials. | Missing Authorization header, expired API key, malformed Bearer token. |
| 403 | INSUFFICIENT_PERMISSIONS |
The authenticated principal lacks the required permissions. | API key does not have stream:publish or stream:consume permission for the target stream, IP address not in allowlist. |
| 404 | STREAM_NOT_FOUND |
The specified stream does not exist. | Incorrect stream_id, stream was deleted, typo in the stream slug name. |
| 409 | IDEMPOTENCY_CONFLICT |
A request with the same idempotency key was already processed with a different payload. | Reusing an X-Idempotency-Key value with a different request body within the 24-hour deduplication window. |
| 429 | RATE_LIMIT_EXCEEDED |
The caller has exceeded the allowed request rate for this endpoint. | Too many publish or consume requests within the current rate limit window. See the Rate Limiting section below for details on limits and backoff strategies. |
| 500 | INTERNAL_ERROR |
An unexpected internal error occurred while processing the request. | Transient server issue. Safe to retry with exponential backoff. If the problem persists, contact support@datastream.io with the request_id from the error response. |
Rate Limiting
Rate limits are applied per API key and per stream. The default limits for the Event Streams endpoints are as follows:
| Endpoint | Plan | Requests / minute | Events / minute |
|---|---|---|---|
| POST /v2/streams/{stream_id}/events | Free | 60 | 6,000 |
| POST /v2/streams/{stream_id}/events | Pro | 1,000 | 100,000 |
| POST /v2/streams/{stream_id}/events | Enterprise | 5,000 | 1,000,000 |
| GET /v2/streams/{stream_id}/events | Free | 120 | 12,000 |
| GET /v2/streams/{stream_id}/events | Pro | 2,000 | 200,000 |
| GET /v2/streams/{stream_id}/events | Enterprise | 10,000 | 2,000,000 |
Rate limit status is communicated through response headers on every request:
| Header | Description |
|---|---|
X-RateLimit-Limit |
The maximum number of requests permitted in the current window. |
X-RateLimit-Remaining |
The number of requests remaining in the current window. |
X-RateLimit-Reset |
The UTC timestamp (ISO 8601) when the current rate limit window resets. |
Retry-After |
Present only on 429 responses. The number of seconds to wait before retrying. |
When you receive a 429 response, implement exponential backoff with jitter. A recommended
strategy is to wait for the duration specified in the Retry-After header, then
add a random jitter of 0 to 1 second. If the Retry-After header is absent,
start with a 1-second delay and double it on each consecutive 429 response, up to a maximum
of 60 seconds.
Rate Limit Handling Example (Python)
import time
import random
import requests
def publish_with_retry(url, payload, headers, max_retries=5):
"""Publish events with automatic rate limit handling."""
delay = 1.0
for attempt in range(max_retries):
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After")
if retry_after:
wait = float(retry_after) + random.uniform(0, 1)
else:
wait = delay + random.uniform(0, 1)
delay = min(delay * 2, 60)
print(f"Rate limited. Waiting {wait:.1f}s before retry {attempt + 1}")
time.sleep(wait)
continue
response.raise_for_status()
return response.json()
raise Exception(f"Failed after {max_retries} retries due to rate limiting")
Need higher limits? Enterprise customers can request custom rate limits through their account manager or by contacting enterprise@datastream.io. Custom limits can be configured per stream, per API key, or at the organization level.