VStream
Change event streams
Vitess Gateways (vtgate
) provide a VStream
service
that allows clients to subscribe to a change event stream for a set of tables.
Use Cases #
- Change Data Capture (CDC):
VStream
can be used to capture changes to a table and send them to a downstream system. This is useful for building real-time data pipelines.
Overview #
VStream
supports copying the current contents of a table — as you will often not have the binary logs going back
to the creation of the table — and then begin streaming new changes to the table from that point on. It supports
resuming this initial copy phase if it's interrupted for any reason. It also supports automatic handling of
resharding events — if the VStream
is connected throughout then it will automatically transition from
the old shards to the new when traffic is switched (SwitchTraffic
or ReverseTraffic
), and
if you were not connected but re-connect after traffic is switched (SwitchTraffic
or ReverseTraffic
)
but before the old shards are removed, it will automatically catch up on any missed changes on the old shards before
seamlessly transitioning to the new shards and continuing to stream all changes made there.
Events in the stream are MySQL row based binary log events — with extended metadata — and can be processed by event bridges which support Vitess such as Debezium. Other products such as AirByte can also be used with custom Vitess connectors.
API Details #
VStream
is a gRPC
that is part of the vtgate
service and is accessible via a
vtgate
process's --grpc_port
.
RPC Parameters #
Context #
Type Context
Required
Default none
In addition to the typical Context
usage, it can contain a custom key-value pair where the key is 1
and the value is a
CallerID
. This value is then passed along to
tablets to identify the originating client for the request. It is not meant to be secure, but
primarily informational. The client can provide whatever info they want in the
CallerID
fields and they will be trusted by the servers
as this information is primarily used to aid in monitoring and debugging. The vtgate
propagates
the value to the source vttablet
processes and the tablets may use this information for various
monitoring, metrics, and logging purposes. It can, however, also be used for other purposes such as denying the client
access to tables during a migration (MoveTables
or Reshard
).
TabletType #
Type TabletType
Required
Default UNKNOWN (you must specify a valid type)
The tablet type to use when selecting stream source tablets.
VGtid #
Type VGtid
Required
The keyspace, shard, and GTID position list to start streaming from. If no ShardGtid.Gtid
value is provided
then a table copy phase will be initiated for the tables matched
by the provided filter on the given shard.
If the ShardGtid.Shard
value is omitted, this means that all shards in the keyspace specified in the ShardGtid.Keyspace
value are included.
Additionally, if the ShardGtid.Keyspace
value has a /
prefix, you can use regular expressions such as /.*
to include all keyspaces.
Filter #
Type Filter
Required
The tables which you want to subscribe to change events from — in the given keyspace(s) and shard(s) contained in the provided VGtid — and any query predicates to use when filtering the rows for which change events will be generated.
VStreamFlags #
Cells #
Type string
Default ""
If specified, these cells (comma-separated list) are used
when selecting stream source tablets. When no value is specified the vtgate
will
default to looking for source tablets within its own local cell.
CellPreference #
Type string
Default ""
If specified, this determines which cells to give preference to during tablet selection.
By default, preferlocalwithalias
is used in order to give preference to the caller's local cell and then any alias its cell belongs to.
If onlyspecified
is given, then only tablets within the specified Cells
field value will be considered.
HeartbeatInterval #
Type unsigned integer
Default 0 (none)
How frequently, in seconds, to send heartbeat events to the client when there are no other events in the stream to send.
IncludeReshardJournalEvents #
Type bool
Default false
When enabled the vtgate
will include reshard journal events in the stream along with all other events.
MinimizeSkew #
Type bool
Default false
When enabled the vtgate
will keep the events in the stream roughly time aligned — it is aggregating streams coming
from each of the shards involved — using the event timestamps to ensure the maximum time skew between the source
tablet shard streams is kept under 10 minutes. When it detects skew between the source streams it will pause sending
the client more events and allow the lagging shard(s) to catch up.
StopOnReshard #
Type bool
Default false
When enabled the vtgate
will send a reshard journal event to the client along with an EOF
error
in the VStreamReader.Recv
response and stop sending any further events.
TabletOrder #
Type string
Default ""
This replaces the in_order
hint (e.g. "in_order:REPLICA,PRIMARY"
) previously used to specify tablet type order during source tablet selection.
RPC Response #
The VStream
gRPC returns
a VStreamReader
and a non-nil error
if
the stream could not be initialized. You would call the Recv
method on that
VStreamReader
in a for loop and
responses will be sent when available. Each response consisting of the following two parameters:
- An array of
VEvent
objects — the new messages to process in the stream - An
error
— an error that, if non-nil, indicates the stream has been closed (EOF
) or an error occurred
API Types #
Example Usage #
You can find a full example go client here.
Below is a snippet showing how to use the VStream
API in go:
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
if err != nil {
t.Fatal(err)
}
defer gconn.Close()
// lastPK is id1=4
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}
var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: "MySQL56/89f66ef2-863a-11ed-9bdf-3d270fd3f552:1-30219"
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: "MySQL56/2174b383-5441-11e8-b90a-c80aa9429562:1-29516,24da167-0c0c-11e8-8442-00059a3c7b00:1-19"
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
...
Copy All Tables From All Shards in the ks
Keyspace #
Below is a snippet in Go that demonstrates how to copy from all shards by omitting ShardGtid.Shard
:
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "ks",
Gtid: "",
}},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
Copy All Tables From All Shards in All Keyspaces #
Below is a snippet in Go that demonstrates how to copy from all keyspaces by specifying /.*
as the value for ShardGtid.Keyspace
:
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*",
Gtid: "",
}},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
Debugging #
There is also an SQL interface that can be used for testing and debugging from a vtgate
. Here's an example:
$ mysql --quick <vtgate params>
mysql> SET WORKLOAD=OLAP;
mysql> VSTREAM * FROM commerce.corder\G
*************** 1. row ***************
op: +
order_id: 1
customer_id: 1
sku: NULL
price: 10
************** 2. row ***************
op: *
order_id: 1
customer_id: 1
sku: NULL
price: 7
************** 3. row ***************
op: -
order_id: 1
customer_id: 1
sku: NULL
price: 7
…
Monitoring #
VTGates publish vstream metrics listed here.
More Reading #
- VStream Copy
- VStream API and Resharding
- VStream Skew Minimization
- Debezium Connector for Vitess
- Blog posts