一个示例输出插件可以在 PostgreSQL 源代码树的 contrib/test_decoding
子目录下找到。
输出插件通过动态加载一个以输出插件名称作为库基础名称的共享库来加载。标准的库搜索路径用于定位该库。为了提供所需的输出插件回调函数并指示该库实际上是一个输出插件,它需要提供一个名为 _PG_output_plugin_init
的函数。该函数会接收一个结构体,需要用各个操作的回调函数指针来填充。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
begin_cb
、change_cb
和 commit_cb
回调函数是必需的,而 startup_cb
、truncate_cb
、message_cb
、filter_by_origin_cb
和 shutdown_cb
是可选的。如果 truncate_cb
未设置但需要解码 TRUNCATE
操作,则该操作将被忽略。
输出插件还可以定义函数来支持大型、进行中的事务的流式传输。 stream_start_cb
、stream_stop_cb
、stream_abort_cb
、stream_commit_cb
和 stream_change_cb
是必需的,而 stream_message_cb
和 stream_truncate_cb
是可选的。如果输出插件也支持两阶段提交,则 stream_prepare_cb
也是必需的。
输出插件还可以定义函数来支持两阶段提交,这允许在 PREPARE TRANSACTION
时进行解码。 begin_prepare_cb
、prepare_cb
、commit_prepared_cb
和 rollback_prepared_cb
回调函数是必需的,而 filter_prepare_cb
是可选的。如果输出插件也支持大型进行中事务的流式传输,则 stream_prepare_cb
也是必需的。
为了解码、格式化和输出更改,输出插件可以使用后端的大部分正常基础设施,包括调用输出函数。允许只读访问关系,只要访问的关系是 initdb
在 pg_catalog
模式下创建的,或者使用以下方式标记为用户提供的目录表:
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
请注意,在输出插件中访问用户目录表或常规系统目录表必须仅通过 systable_*
扫描 API 进行。通过 heap_*
扫描 API 访问将导致错误。此外,任何导致事务 ID 分配的操作都是禁止的。这包括写入表、执行 DDL 更改以及调用 pg_current_xact_id()
。
输出插件回调函数可以以几乎任意格式将数据传递给消费者。对于某些用例,例如通过 SQL 查看更改,以可以包含任意数据的数据类型(例如 bytea
)返回数据会很麻烦。如果输出插件仅输出服务器编码的文本数据,则可以通过在 startup callback 中将 OutputPluginOptions.output_type
设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
而不是 OUTPUT_PLUGIN_BINARY_OUTPUT
来声明这一点。在这种情况下,所有数据都必须是服务器编码,以便 text
数据类型可以包含它。这将在启用断言的版本中进行检查。
输出插件通过它需要提供的各种回调函数获得关于正在发生的更改的通知。
并发事务按提交顺序解码,并且仅在 begin
和 commit
回调函数之间解码属于特定事务的更改。显式或隐式回滚的事务永远不会被解码。成功的保存点按照它们在事务中执行的顺序折叠到包含它们的事务中。使用 PREPARE TRANSACTION
准备提交的两阶段事务也会被解码,前提是提供了解码它们的输出插件回调函数。当前正在解码的已准备事务可能因为并发执行 ROLLBACK PREPARED
命令而被中止。在这种情况下,此事务的逻辑解码也将被中止。一旦检测到中止并调用了 prepare_cb
回调函数,就会跳过此类事务的所有更改。因此,即使发生并发中止,也会向输出插件提供足够的信息,以便它能够正确处理 ROLLBACK PREPARED
。
只有已安全刷新到磁盘的事务才会被解码。当 synchronous_commit
设置为 off
时,这可能导致在直接调用的 pg_logical_slot_get_changes()
中 COMMIT
未立即被解码。
可选的 startup_cb
回调函数在创建复制槽或请求流式传输更改时被调用,而不管有多少更改已准备好输出。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
is_init
参数在创建复制槽时为 true,否则为 false。options
指向一个输出插件可以设置的选项结构。
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
必须设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
或 OUTPUT_PLUGIN_BINARY_OUTPUT
。另请参阅 Section 47.6.3。如果 receive_rewrites
为 true,则输出插件在某些 DDL 操作期间对堆重写所做的更改也会被调用。这些对于处理 DDL 复制的插件很有用,但需要特殊处理。
startup 回调函数应验证 ctx->output_plugin_options
中的选项。如果输出插件需要状态,它可以使用 ctx->output_plugin_private
来存储它。
可选的 shutdown_cb
回调函数在曾经活跃的复制槽不再使用时被调用,可用于释放输出插件私有的资源。槽不一定会被删除,只是停止流式传输。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
必需的 begin_cb
回调函数在解码已提交事务的开始时被调用。中止的事务及其内容永远不会被解码。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
txn
参数包含关于事务的元信息,例如它提交的时间戳及其 XID。
必需的 commit_cb
回调函数在解码事务提交时被调用。如果存在已修改的行,则在该函数调用之前,所有已修改行的 change_cb
回调函数已被调用。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
必需的 change_cb
回调函数针对事务中的每个单独的行修改被调用,无论是 INSERT
、UPDATE
还是 DELETE
。即使原始命令一次修改了多行,该回调函数也会为每一行单独调用。change_cb
回调函数可以访问系统或用户目录表,以协助输出行修改的详细信息。在解码已准备(但尚未提交)的事务或未提交事务时,由于该事务的同步回滚,此更改回调函数也可能出错。在这种情况下,此中止事务的逻辑解码将优雅地停止。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
ctx
和 txn
参数的内容与 begin_cb
和 commit_cb
回调函数相同,但此外,关系描述符 relation
指向该行所属的关系,并且一个描述行修改的结构 change
被传入。
只有用户定义的表中非未记录(参见 UNLOGGED
)且非临时(参见 TEMPORARY
或 TEMP
)的表中的更改才能使用逻辑解码提取。
可选的 truncate_cb
回调函数在执行 TRUNCATE
命令时被调用。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
参数类似于 change_cb
回调函数。但是,由于通过外键连接的表上的 TRUNCATE
操作需要一起执行,因此此回调函数接收一个关系数组而不是单个关系。有关详细信息,请参阅 TRUNCATE 语句的描述。
调用可选的 filter_by_origin_cb
回调函数以确定来自 origin_id
的已重放数据是否对输出插件感兴趣。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
ctx
参数的内容与其他回调函数相同。只能获得源信息。要表示来自传入节点的更改无关紧要,请返回 true,从而过滤掉它们;否则返回 false。其他回调函数不会为已被过滤掉的事务和更改调用。
这对于实现级联或多向复制解决方案很有用。按源过滤允许在这些设置中防止相同的更改来回复制。虽然事务和更改也包含源信息,但通过此回调函数进行过滤效率明显更高。
可选的 message_cb
回调函数在解码逻辑解码消息时被调用。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
txn
参数包含关于事务的元信息,例如它提交的时间戳及其 XID。但请注意,当消息非事务性且在记录消息的事务中尚未分配 XID 时,它可以为 NULL。lsn
是消息的 WAL 位置。transactional
表示消息是作为事务性发送还是非事务性发送。与更改回调函数类似,在解码已准备(但尚未提交)的事务或未提交事务时,由于该事务的同步回滚,此消息回调函数也可能出错。在这种情况下,此中止事务的逻辑解码将优雅地停止。prefix
是一个任意的 null 终止前缀,可用于识别当前插件感兴趣的消息。最后,message
参数包含 message_size
大小的实际消息。
应格外小心,确保输出插件认为有趣的prefix是唯一的。使用扩展名或输出插件本身的名称通常是一个不错的选择。
可选的 filter_prepare_cb
回调函数用于确定当前两阶段提交事务中的数据是在准备阶段解码,还是在 COMMIT PREPARED
时作为常规单阶段事务解码。要指示跳过解码,请返回 true
;否则返回 false
。当未定义回调函数时,假定为 false
(即不进行过滤,所有使用两阶段提交的事务也以两阶段进行解码)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
ctx
参数的内容与其他回调函数相同。xid
和 gid
参数提供了两种不同的事务标识方式。稍后的 COMMIT PREPARED
或 ROLLBACK PREPARED
会携带这两个标识符,让输出插件可以选择使用哪个。
对于每个事务,该回调函数可能会被调用多次以进行解码,并且每次调用时都必须为给定的 xid
和 gid
对提供相同的静态答案。
必需的 begin_prepare_cb
回调函数在解码已准备事务的开始时被调用。gid
字段包含在 txn
参数中,可以在此回调函数中使用,以检查插件是否已收到此 PREPARE
,在这种情况下,它可以出错或跳过该事务的剩余更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必需的 prepare_cb
回调函数在解码已准备提交的两阶段事务时被调用。如果存在已修改的行,则在调用此函数之前,所有已修改行的 change_cb
回调函数已被调用。gid
字段包含在 txn
参数中,可以在此回调函数中使用。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
必需的 commit_prepared_cb
回调函数在解码事务 COMMIT PREPARED
时被调用。gid
字段包含在 txn
参数中,可以在此回调函数中使用。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
必需的 rollback_prepared_cb
回调函数在解码事务 ROLLBACK PREPARED
时被调用。gid
字段包含在 txn
参数中,可以在此回调函数中使用。prepare_end_lsn
和 prepare_time
参数可用于检查插件是否收到了此 PREPARE TRANSACTION
,在这种情况下,它可以应用回滚,否则,它可以跳过回滚操作。gid
本身不足以区分,因为下游节点可能有一个具有相同标识符的已准备事务。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
必需的 stream_start_cb
回调函数在从进行中的事务中打开一个流式更改块时被调用。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必需的 stream_stop_cb
回调函数在关闭从进行中的事务中流式传输的更改块时被调用。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
必需的 stream_abort_cb
回调函数用于中止先前已流式传输的事务。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
stream_prepare_cb
回调函数在准备先前已流式传输的事务作为两阶段提交的一部分时被调用。当输出插件同时支持大型进行中事务的流式传输和两阶段提交时,此回调函数是必需的。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
必需的 stream_commit_cb
回调函数在提交先前已流式传输的事务时被调用。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
必需的 stream_change_cb
回调函数在流式更改块(由 stream_start_cb
和 stream_stop_cb
调用分隔)中发送更改时被调用。实际更改不会显示,因为事务稍后可能会中止,而我们不会解码已中止事务的更改。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
可选的 stream_message_cb
回调函数在流式更改块(由 stream_start_cb
和 stream_stop_cb
调用分隔)中发送通用消息时被调用。事务性消息的内容不会显示,因为事务稍后可能会中止,而我们不会解码已中止事务的更改。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
可选的 stream_truncate_cb
回调函数在流式更改块(由 stream_start_cb
和 stream_stop_cb
调用分隔)中的 TRUNCATE
命令时被调用。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
参数类似于 stream_change_cb
回调函数。但是,由于通过外键连接的表上的 TRUNCATE
操作需要一起执行,因此此回调函数接收一个关系数组而不是单个关系。有关详细信息,请参阅 TRUNCATE 语句的描述。
要实际生成输出,输出插件可以在 begin_cb
、commit_cb
或 change_cb
回调函数中使用 ctx->out
中的 StringInfo
输出缓冲区写入数据。在写入输出缓冲区之前,必须调用 OutputPluginPrepareWrite(ctx, last_write)
,并在完成写入缓冲区后,必须调用 OutputPluginWrite(ctx, last_write)
来执行写入。last_write
指示特定的写入是否是回调的最后一次写入。
以下示例展示了如何向输出插件的消费者输出数据
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);
如果您在文档中发现任何不正确、与您的实际体验不符或需要进一步澄清的内容,请使用 此表格 报告文档问题。