MongoDB Sink
Overview
The MongoDB sink pulls messages from a KurrentDB stream and stores them in a collection. The records will be serialized into BSON documents, so the data must be valid for BSON format.
Quickstart
You can create the MongoDB Sink connector as follows. Replace id
with a unique connector name or ID:
POST /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "mongo-db-sink",
"connectionString": "mongodb://127.0.0.1:27020",
"database": "my_database",
"collection": "my_collection",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream"
}
}
After creating and starting the MongoDB sink connector, every time an event is appended to the example-stream
, the MongoDB sink connector will send the record to the specified collection in the database. You can find a list of available management API endpoints in the API Reference.
Settings
Adjust these settings to specify the behavior and interaction of your MongoDB sink connector with KurrentDB, ensuring it operates according to your requirements and preferences.
Tips
The MongoDB sink inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page.
The MongoDB sink can be configured with the following options:
Name | Details |
---|---|
database | required Description: The name of the database where the records will be stored. |
collection | required Description: The collection name that resides in the database to push records to. |
connectionString | required Description: The MongoDB URI to which the connector connects. Default: "mongodb://mongoadmin:secret@localhost:27017/admin" |
documentId:source | Description: The attribute used to generate the document id. Default: "recordId" Accepted Values: - "recordId" , "stream" , "headers" , or "streamSuffix" |
documentId:expression | Description: The expression used to format the document id based on the selected source. This allows for custom id generation logic. Default: "250" |
certificate:rawData | Description: Base64 encoded x509 certificate. Default: "" |
certificate:password | Description: The password used to access the x509 certificate for secure connections. Default: "" |
batching:batchSize | Description: Threshold batch size at which the sink will push the batch of records to the MongoDB collection. Default: "1000" |
batching:batchTimeoutMs | Description: Threshold time in milliseconds at which the sink will push the current batch of records to the MongoDB collection, regardless of the batch size. Default: "250" |
Resilience options can be found in the Resilience Configuration section.
Metadata
The MongoDB sink connector automatically includes these default headers in each document sent to the collection. These headers provide metadata about the event and are stored in a _metadata
field within the document.
Examples
Authentication
This MongoDB sink connector currently only supports SCRAM and X.509 certificate authentication.
SCRAM
To use SCRAM for authentication, include the username and password in the connection string and set the authMechanism
parameter in the connection string to either SCRAM-SHA-1
or SCRAM-SHA-256
to select the desired MongoDB authentication mechanism. For more explanations on the connection string URI refer to the official MongoDB documentation on Authentication Mechanism.
Note
MongoDB version 4.0 and later uses SCRAM-SHA-256 as the default authentication mechanism if the MongoDB server version supports it.
X.509 certificate authentication
To use X.509 certificate authentication, include the base64 encoded x509 certificate and the password in the settings. You can use an online tool like base64encode to encode your certificate.
POST /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"certificate:rawData": "base64encodedstring",
"certificate:password": "password"
}
Document ID Strategy
The id of the document can be generated automatically based on the source specified and expression if needed. The following options are available:
By default, the MongoDB sink connector uses the record's Id as the document ID. This is the unique identifier generated for every record in KurrentDB.
Set Document ID using Stream ID
You can extract part of the stream name using a regular expression (regex) to define the document id. The expression is optional and can be customized based on your naming convention. In this example, the expression captures the stream name up to _data
.
POST /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"documentId:source": "stream",
"documentId:expression": "^(.*)_data$"
}
Alternatively, if you only need the last segment of the stream name (after a hyphen), you can use the streamSuffix
source. This doesn't require an expression since it automatically extracts the suffix.
POST /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"documentId:source": "streamSuffix"
}
The streamSuffix
source is useful when stream names follow a structured format, and you want to use only the trailing part as the document ID. For example, if the stream is named user-123
, the document ID would be 123
.
Set Document ID from Headers
You can create the document ID by combining values from a record's metadata.
POST /connectors/{{id}}
Host: localhost:2113
Content-Type: application/json
{
"documentId:source": "headers",
"documentId:expression": "key1,key2"
}
Specify the header keys you want to use in the documentId:expression
field (e.g., key1,key2
). The connector will concatenate the header values with a hyphen (-
) to create the partition key.
For example, if your event has headers key1: regionA
and key2: zone1
, the partition key will be regionA-zone1
.
Tutorial
Learn how to set up and use a MongoDB Sink connector in KurrentDB through a tutorial.