Skip to main content

Collection (database) Source

You can create collections with your stream worker, and store incoming data in it for further processing.

This page explains how to use collections as sources. For information about using collections as stores, refer to Tables (Collections).

note

If you want to use an existing Macrometa collection as a source, then you still need to define it in the stream worker and you must enable streaming on the collection.

Got it. Here is the updated content with the new parameter:


Syntax

CREATE SOURCE SourceName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type', skip.event.with.delete='BOOLEAN') (strings);

Parameters

ParameterDescriptionDefault ValuePossible Data TypesOptional
collectionSpecifies the name of the collection to which the source must listen.STRINGNo
replication.typeSpecifies the replication type of the collection. Must be global as local collections are not allowed.localSTRINGNo
collection.typeSpecifies the type of data the collection contains. Possible values are doc and edge.docSTRINGYes
skip.event.with.deleteDetermines whether to skip processing messages with the field _delete. If false, all messages are processed; if true, only the keys of deleted messages are returned.trueBOOLEANYes

Example 1

CREATE SOURCE SweetProductionStream WITH (type='database', collection='SweetProductionData', collection.type='doc', replication.type='GLOBAL', map.type='json', skip.event.with.delete='false') (name string, amount double);

Example 2

CREATE SOURCE account WITH (type='database', collection='account', collection.type='doc', replication.type='GLOBAL', map.type='json', skip.event.with.delete='false') (contact object, created_at string);

Example 3

-- Create Table SampleCargoAppInputTable to process events.
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection ="SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream ="SampleCargoAppDestStream", replication.type="local") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;