可以在 PostgreSQL 源代码树的 contrib/test_decoding 子目录中找到示例输出插件。
输出插件通过动态加载一个共享库来加载,该共享库的库基本名称与输出插件的名称相同。使用正常的库搜索路径来查找该库。为了提供所需的输出插件回调并指示该库实际上是一个输出插件,它需要提供一个名为 _PG_output_plugin_init 的函数。此函数被传递一个结构体,该结构体需要填充各个操作的回调函数指针。
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
需要 begin_cb、change_cb 和 commit_cb 回调,而 startup_cb、truncate_cb、message_cb、filter_by_origin_cb 和 shutdown_cb 是可选的。如果未设置 truncate_cb 但要解码 TRUNCATE,则该操作将被忽略。
输出插件还可以定义函数来支持大型进行中事务的流式传输。需要 stream_start_cb、stream_stop_cb、stream_abort_cb、stream_commit_cb 和 stream_change_cb,而 stream_message_cb 和 stream_truncate_cb 是可选的。如果输出插件也支持两阶段提交,则还需要 stream_prepare_cb。
输出插件还可以定义函数来支持两阶段提交,这允许在 PREPARE TRANSACTION 上解码操作。需要 begin_prepare_cb、prepare_cb、commit_prepared_cb 和 rollback_prepared_cb 回调,而 filter_prepare_cb 是可选的。如果输出插件也支持大型进行中事务的流式传输,则还需要 stream_prepare_cb。
为了解码、格式化和输出更改,输出插件可以使用后端的大部分正常基础架构,包括调用输出函数。只要只访问由 initdb 在 pg_catalog 模式中创建的关系,或者使用以下方式标记为用户提供的目录表的关系,则允许对关系进行只读访问:
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
请注意,必须仅通过 systable_* 扫描 API 来访问输出插件中的用户目录表或常规系统目录表。通过 heap_* 扫描 API 访问将会出错。此外,禁止执行任何导致事务 ID 分配的操作。这包括写入表、执行 DDL 更改和调用 pg_current_xact_id() 等。
输出插件回调可以以几乎任意的格式将数据传递给消费者。对于某些用例(例如通过 SQL 查看更改),以可以包含任意数据的数据类型(例如 bytea)返回数据是很麻烦的。如果输出插件仅以服务器的编码输出文本数据,则可以通过在 启动回调中将 OutputPluginOptions.output_type 设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 而不是 OUTPUT_PLUGIN_BINARY_OUTPUT 来声明这一点。在这种情况下,所有数据都必须采用服务器的编码,以便 text 数据可以包含它。这在启用断言的构建中进行检查。
输出插件会通过它需要提供的各种回调来接收有关正在发生的更改的通知。
并发事务按提交顺序解码,并且在 begin 和 commit 回调之间仅解码属于特定事务的更改。明确或隐式回滚的事务永远不会被解码。成功的保存点会按照它们在该事务中执行的顺序,折叠到包含它们的事务中。如果提供了用于解码它们的输出插件回调,则使用 PREPARE TRANSACTION 为两阶段提交准备的事务也将被解码。当前正在解码的准备事务可能会通过 ROLLBACK PREPARED 命令并发中止。在这种情况下,此事务的逻辑解码也将中止。一旦检测到中止并调用 prepare_cb 回调,将跳过此类事务的所有更改。因此,即使在发生并发中止的情况下,也为输出插件提供了足够的信息,以便在解码 ROLLBACK PREPARED 时正确处理它。
仅解码已安全刷新到磁盘的事务。当 synchronous_commit 设置为 off 时,这可能会导致在直接后续的 pg_logical_slot_get_changes() 中不会立即解码 COMMIT。
每当创建复制槽或请求流式传输更改时,都会调用可选的 startup_cb 回调,而与准备输出的更改数量无关。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
当正在创建复制槽时,is_init 参数将为 true,否则为 false。options 指向输出插件可以设置的选项结构体
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
output_type 必须设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 或 OUTPUT_PLUGIN_BINARY_OUTPUT。另请参阅 第 47.6.3 节。如果 receive_rewrites 为 true,则对于在某些 DDL 操作期间由堆重写所做的更改,也将调用输出插件。这些对于处理 DDL 复制的插件很有用,但它们需要特殊处理。
启动回调应验证 ctx->output_plugin_options 中存在的选项。如果输出插件需要具有状态,则可以使用 ctx->output_plugin_private 来存储它。
每当不再使用以前活动的复制槽时,都会调用可选的 shutdown_cb 回调,并且可以使用该回调来释放输出插件私有的资源。该槽不一定被删除,只是停止了流式传输。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
每当已解码已提交事务的开始时,都会调用必需的 begin_cb 回调。中止的事务及其内容永远不会被解码。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
txn 参数包含有关事务的元信息,例如其提交的时间戳及其 XID。
每当已解码事务提交时,都会调用必需的 commit_cb 回调。如果存在任何修改的行,则在此之前,将已调用所有已修改行的 change_cb 回调。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
对于事务内的每一次单独的行修改,都会调用必需的 change_cb 回调函数,无论是 INSERT、UPDATE 还是 DELETE 操作。即使原始命令一次修改了多行,回调函数也会针对每一行单独调用。 change_cb 回调函数可以访问系统或用户目录表,以帮助输出行修改的详细信息。如果解码一个已准备好(但尚未提交)的事务或解码一个未提交的事务,则此更改回调也可能因同时回滚此同一事务而报错。在这种情况下,将优雅地停止对已中止事务的逻辑解码。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
ctx 和 txn 参数的内容与 begin_cb 和 commit_cb 回调函数相同,但此外,关系描述符 relation 指向行所属的关系,并且还会传入一个描述行修改的结构体 change。
只有用户定义的、不是未记录的(参见 UNLOGGED)且不是临时的(参见 TEMPORARY 或 TEMP)表中的更改才能使用逻辑解码提取。
对于 TRUNCATE 命令,将调用可选的 truncate_cb 回调函数。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
参数与 change_cb 回调函数类似。但是,由于对通过外键连接的表执行 TRUNCATE 操作需要一起执行,因此此回调接收一个关系数组,而不是单个关系。有关详细信息,请参见 TRUNCATE 语句的描述。
将调用可选的 filter_by_origin_cb 回调函数,以确定从 origin_id 重放的数据是否是输出插件感兴趣的。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
ctx 参数的内容与其他回调函数相同。除了源之外,没有其他信息可用。要指示源于传入节点上的更改是不相关的,则返回 true,这将导致它们被过滤掉;否则返回 false。对于已被过滤掉的事务和更改,将不会调用其他回调函数。
这在实现级联或多向复制解决方案时非常有用。通过源进行筛选可以防止在这些设置中来回复制相同的更改。虽然事务和更改也携带有关源的信息,但是通过此回调进行筛选效率明显更高。
每当解码逻辑解码消息时,都会调用可选的 message_cb 回调函数。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
txn 参数包含有关事务的元信息,例如其提交的时间戳和 XID。但是请注意,当消息是非事务性的并且在记录消息的事务中尚未分配 XID 时,它可以为 NULL。lsn 具有消息的 WAL 位置。transactional 指示消息是否作为事务性消息发送。与更改回调类似,如果解码已准备好(但尚未提交)的事务或解码未提交的事务,则此消息回调也可能因同时回滚此同一事务而报错。在这种情况下,将优雅地停止对已中止事务的逻辑解码。prefix 是任意以 null 结尾的前缀,可用于标识当前插件感兴趣的消息。最后,message 参数保存大小为 message_size 的实际消息。
应格外小心,以确保输出插件认为感兴趣的前缀是唯一的。通常,使用扩展名或输出插件本身的名称是一个不错的选择。
将调用可选的 filter_prepare_cb 回调函数,以确定是否应在此准备阶段或稍后在 COMMIT PREPARED 时作为常规单阶段事务来考虑对当前两阶段提交事务中的数据进行解码。要指示应跳过解码,请返回 true;否则返回 false。如果未定义回调,则假定为 false(即,不进行筛选,所有使用两阶段提交的事务也都分两个阶段解码)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
ctx 参数的内容与其他回调函数相同。参数 xid 和 gid 提供了两种不同的方式来标识事务。后面的 COMMIT PREPARED 或 ROLLBACK PREPARED 都带有标识符,从而为输出插件提供了选择使用哪种标识符的选项。
每个事务可以多次调用回调函数进行解码,并且每次调用时都必须为给定的 xid 和 gid 对提供相同的静态答案。
每当解码准备好的事务的开始时,都会调用必需的 begin_prepare_cb 回调函数。gid 字段是 txn 参数的一部分,可以在此回调函数中使用,以检查插件是否已收到此 PREPARE,在这种情况下,它可以报错或跳过事务的其余更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
每当解码为两阶段提交准备的事务时,都会调用必需的 prepare_cb 回调函数。如果存在任何修改的行,则会在此之前调用所有已修改行的 change_cb 回调函数。gid 字段是 txn 参数的一部分,可以在此回调函数中使用。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
每当解码 COMMIT PREPARED 事务时,都会调用必需的 commit_prepared_cb 回调函数。gid 字段是 txn 参数的一部分,可以在此回调函数中使用。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
每当解码 ROLLBACK PREPARED 事务时,都会调用必需的 rollback_prepared_cb 回调函数。gid 字段是 txn 参数的一部分,可以在此回调函数中使用。参数 prepare_end_lsn 和 prepare_time 可用于检查插件是否已收到此 PREPARE TRANSACTION,在这种情况下,它可以应用回滚,否则,它可以跳过回滚操作。仅 gid 是不够的,因为下游节点可以具有具有相同标识符的准备好的事务。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
当打开正在进行中的事务的流式更改块时,将调用必需的 stream_start_cb 回调函数。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
当关闭正在进行中的事务的流式更改块时,将调用必需的 stream_stop_cb 回调函数。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
调用必需的 stream_abort_cb 回调函数以中止先前流式传输的事务。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
当输出插件同时支持大型正在进行中的事务的流式传输和两阶段提交时,需要调用 stream_prepare_cb 回调函数,以准备先前流式传输的事务作为两阶段提交的一部分。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
调用必需的 stream_commit_cb 回调函数以提交先前流式传输的事务。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
当在流式更改块(由 stream_start_cb 和 stream_stop_cb 调用分隔)中发送更改时,会调用必需的 stream_change_cb 回调函数。实际更改不会显示,因为事务可能在稍后时间中止,并且我们不会解码已中止事务的更改。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
当在流式更改块(由 stream_start_cb 和 stream_stop_cb 调用分隔)中发送通用消息时,会调用可选的 stream_message_cb 回调函数。不会显示事务性消息的消息内容,因为事务可能在稍后时间中止,并且我们不会解码已中止事务的更改。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
对于流式更改块(由 stream_start_cb 和 stream_stop_cb 调用分隔)中的 TRUNCATE 命令,将调用可选的 stream_truncate_cb 回调函数。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
参数与 stream_change_cb 回调函数类似。但是,由于对通过外键连接的表执行 TRUNCATE 操作需要一起执行,因此此回调接收一个关系数组,而不是单个关系。有关详细信息,请参见 TRUNCATE 语句的描述。
要实际生成输出,输出插件可以在 begin_cb、commit_cb 或 change_cb 回调函数内将数据写入 ctx->out 中的 StringInfo 输出缓冲区。在写入输出缓冲区之前,必须调用 OutputPluginPrepareWrite(ctx, last_write),在完成写入缓冲区后,必须调用 OutputPluginWrite(ctx, last_write) 来执行写入。last_write 指示特定写入是否是回调函数的最后一次写入。
以下示例说明如何将数据输出到输出插件的使用者
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);
如果您在文档中看到任何不正确、与您使用特定功能的体验不符或需要进一步说明的内容,请使用此表单报告文档问题。