2025年9月25日: PostgreSQL 18 发布!
支持的版本: 当前 (18) / 17 / 16 / 15 / 14 / 13
开发版本: devel
不支持的版本: 12 / 11 / 10

29.2. 订阅 #

一个 订阅 是逻辑复制的下游。定义订阅的节点称为 订阅者。订阅定义了与另一个数据库的连接以及它想要订阅的一个或多个发布。

订阅者数据库的行为与其他任何 PostgreSQL 实例相同,并且可以通过定义自己的发布来作为其他数据库的发布者。

订阅者节点可以有多个订阅。可以在单个发布者-订阅者对之间定义多个订阅,在这种情况下,必须小心确保订阅的发布对象不重叠。

每个订阅将通过一个复制槽接收更改(参见 第 26.2.6 节)。可能需要额外的复制槽来进行预先存在的表数据的初始数据同步,这些同步槽将在数据同步结束时被删除。

逻辑复制订阅可以作为同步复制的备用(参见 第 26.2.8 节)。备用名称默认为订阅名称。可以在订阅的连接信息中的 application_name 中指定备用名称。

如果当前用户是超级用户,则订阅会被 pg_dump 转储。否则会写入警告并跳过订阅,因为非超级用户无法从 pg_subscription 目录中读取所有订阅信息。

订阅使用 CREATE SUBSCRIPTION 添加,并可以使用 ALTER SUBSCRIPTION 命令随时停止/恢复,并使用 DROP SUBSCRIPTION 删除。

当订阅被删除并重新创建时,同步信息会丢失。这意味着之后必须重新同步数据。

模式定义不会被复制,发布表的表必须存在于订阅者上。只有常规表可以是复制的目标。例如,不能复制到视图。

发布者和订阅者之间的表使用完全限定的表名进行匹配。不支持复制到订阅者上不同名称的表。

表的列也按名称匹配。订阅者表中的列顺序不必与发布者中的匹配。列的数据类型不必匹配,只要数据的文本表示可以转换为目标类型即可。例如,可以从 integer 类型的列复制到 bigint 类型的列。目标表也可以包含发布表中未提供的额外列。任何此类列都将使用目标表定义中指定的默认值进行填充。但是,二进制格式的逻辑复制更为严格。有关详细信息,请参阅 CREATE SUBSCRIPTIONbinary 选项。

29.2.1. 复制槽管理 #

如前所述,每个(活动的)订阅从远程(发布)端的复制槽接收更改。

额外的表同步槽通常是临时的,内部创建用于执行初始表同步,并在不再需要时自动删除。这些表同步槽具有生成的名称:pg_%u_sync_%u_%llu(参数:订阅 oid,表 relid,系统标识符 sysid)。

通常,使用 CREATE SUBSCRIPTION 创建订阅时,远程复制槽会自动创建,并在使用 DROP SUBSCRIPTION 删除订阅时自动删除。但在某些情况下,单独操作订阅和底层复制槽可能有用或必要。以下是一些场景:

  • 创建订阅时,复制槽已存在。在这种情况下,可以使用 create_slot = false 选项创建订阅,以关联现有槽。

  • 创建订阅时,远程主机不可达或状态不明确。在这种情况下,可以使用 connect = false 选项创建订阅。这样就不会联系远程主机。这是 pg_dump 使用的方式。然后必须在订阅激活之前手动创建远程复制槽。

  • 删除订阅时,应保留复制槽。当订阅者数据库移动到不同的主机并从那里激活时,这可能很有用。在这种情况下,在尝试删除订阅之前,使用 ALTER SUBSCRIPTION 将槽与订阅分离。

  • 删除订阅时,远程主机不可达。在这种情况下,在尝试删除订阅之前,使用 ALTER SUBSCRIPTION 将槽与订阅分离。如果远程数据库实例不再存在,则无需进一步操作。但是,如果远程数据库实例只是不可达,那么复制槽(以及任何剩余的表同步槽)应该被手动删除;否则它/它们将继续占用 WAL 并可能最终导致磁盘空间不足。应仔细调查此类情况。

29.2.2. 示例: 设置逻辑复制 #

在发布者上创建一些测试表。

/* pub # */ CREATE TABLE t1(a int, b text, PRIMARY KEY(a));
/* pub # */ CREATE TABLE t2(c int, d text, PRIMARY KEY(c));
/* pub # */ CREATE TABLE t3(e int, f text, PRIMARY KEY(e));

在订阅者上创建相同的表。

/* sub # */ CREATE TABLE t1(a int, b text, PRIMARY KEY(a));
/* sub # */ CREATE TABLE t2(c int, d text, PRIMARY KEY(c));
/* sub # */ CREATE TABLE t3(e int, f text, PRIMARY KEY(e));

在发布者端向表中插入数据。

/* pub # */ INSERT INTO t1 VALUES (1, 'one'), (2, 'two'), (3, 'three');
/* pub # */ INSERT INTO t2 VALUES (1, 'A'), (2, 'B'), (3, 'C');
/* pub # */ INSERT INTO t3 VALUES (1, 'i'), (2, 'ii'), (3, 'iii');

为表创建发布。发布 pub2pub3a 阻止某些 publish 操作。发布 pub3b 有一个行过滤器(参见 第 29.4 节)。

/* pub # */ CREATE PUBLICATION pub1 FOR TABLE t1;
/* pub # */ CREATE PUBLICATION pub2 FOR TABLE t2 WITH (publish = 'truncate');
/* pub # */ CREATE PUBLICATION pub3a FOR TABLE t3 WITH (publish = 'truncate');
/* pub # */ CREATE PUBLICATION pub3b FOR TABLE t3 WHERE (e > 5);

为发布创建订阅。订阅 sub3 订阅 pub3apub3b。所有订阅默认都会复制初始数据。

/* sub # */ CREATE SUBSCRIPTION sub1
/* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=sub1'
/* sub - */ PUBLICATION pub1;
/* sub # */ CREATE SUBSCRIPTION sub2
/* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=sub2'
/* sub - */ PUBLICATION pub2;
/* sub # */ CREATE SUBSCRIPTION sub3
/* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=sub3'
/* sub - */ PUBLICATION pub3a, pub3b;

观察到初始表数据被复制,无论发布的 publish 操作如何。

/* sub # */ SELECT * FROM t1;
 a |   b
---+-------
 1 | one
 2 | two
 3 | three
(3 rows)

/* sub # */ SELECT * FROM t2;
 c | d
---+---
 1 | A
 2 | B
 3 | C
(3 rows)

此外,由于初始数据复制忽略了 publish 操作,并且因为发布 pub3a 没有行过滤器,这意味着复制的表 t3 包含所有行,即使它们不匹配发布 pub3b 的行过滤器。

/* sub # */ SELECT * FROM t3;
 e |  f
---+-----
 1 | i
 2 | ii
 3 | iii
(3 rows)

在发布者端向表中插入更多数据。

/* pub # */ INSERT INTO t1 VALUES (4, 'four'), (5, 'five'), (6, 'six');
/* pub # */ INSERT INTO t2 VALUES (4, 'D'), (5, 'E'), (6, 'F');
/* pub # */ INSERT INTO t3 VALUES (4, 'iv'), (5, 'v'), (6, 'vi');

现在发布者端的数据如下所示:

/* pub # */ SELECT * FROM t1;
 a |   b
---+-------
 1 | one
 2 | two
 3 | three
 4 | four
 5 | five
 6 | six
(6 rows)

/* pub # */ SELECT * FROM t2;
 c | d
---+---
 1 | A
 2 | B
 3 | C
 4 | D
 5 | E
 6 | F
(6 rows)

/* pub # */ SELECT * FROM t3;
 e |  f
---+-----
 1 | i
 2 | ii
 3 | iii
 4 | iv
 5 | v
 6 | vi
(6 rows)

观察到在正常复制期间使用了适当的 publish 操作。这意味着发布 pub2pub3a 不会复制 INSERT。同样,发布 pub3b 只会复制匹配 pub3b 行过滤器的 pub3b。现在订阅者端的数据如下所示:

/* sub # */ SELECT * FROM t1;
 a |   b
---+-------
 1 | one
 2 | two
 3 | three
 4 | four
 5 | five
 6 | six
(6 rows)

/* sub # */ SELECT * FROM t2;
 c | d
---+---
 1 | A
 2 | B
 3 | C
(3 rows)

/* sub # */ SELECT * FROM t3;
 e |  f
---+-----
 1 | i
 2 | ii
 3 | iii
 6 | vi
(4 rows)

29.2.3. 示例: 延迟复制槽创建 #

在某些情况下(例如 第 29.2.1 节),如果远程复制槽未自动创建,用户必须手动创建它才能激活订阅。以下示例显示了创建槽和激活订阅的步骤。这些示例指定了标准的逻辑解码输出插件(pgoutput),这是内置逻辑复制使用的。

首先,为示例创建一个发布。

/* pub # */ CREATE PUBLICATION pub1 FOR ALL TABLES;

示例 1: 订阅指定 connect = false

  • 创建订阅。

    /* sub # */ CREATE SUBSCRIPTION sub1
    /* sub - */ CONNECTION 'host=localhost dbname=test_pub'
    /* sub - */ PUBLICATION pub1
    /* sub - */ WITH (connect=false);
    WARNING:  subscription was created, but is not connected
    HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
    
  • 在发布者上,手动创建一个槽。因为在 CREATE SUBSCRIPTION 期间未指定名称,所以要创建的槽的名称与订阅名称相同,例如“sub1”。

    /* pub # */ SELECT * FROM pg_create_logical_replication_slot('sub1', 'pgoutput');
     slot_name |    lsn
    -----------+-----------
     sub1      | 0/19404D0
    (1 row)
    
  • 在订阅者上,完成订阅的激活。之后 pub1 的表将开始复制。

    /* sub # */ ALTER SUBSCRIPTION sub1 ENABLE;
    /* sub # */ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
    

示例 2: 订阅指定 connect = false,但还指定了 slot_name 选项。

  • 创建订阅。

    /* sub # */ CREATE SUBSCRIPTION sub1
    /* sub - */ CONNECTION 'host=localhost dbname=test_pub'
    /* sub - */ PUBLICATION pub1
    /* sub - */ WITH (connect=false, slot_name='myslot');
    WARNING:  subscription was created, but is not connected
    HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
    
  • 在发布者上,使用在 CREATE SUBSCRIPTION 期间指定的相同名称(例如,“myslot”)手动创建一个槽。

    /* pub # */ SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput');
     slot_name |    lsn
    -----------+-----------
     myslot    | 0/19059A0
    (1 row)
    
  • 在订阅者上,其余的订阅激活步骤与之前相同。

    /* sub # */ ALTER SUBSCRIPTION sub1 ENABLE;
    /* sub # */ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
    

示例 3: 订阅指定 slot_name = NONE

  • 创建订阅。当 slot_name = NONE 时,也需要 enabled = falsecreate_slot = false

    /* sub # */ CREATE SUBSCRIPTION sub1
    /* sub - */ CONNECTION 'host=localhost dbname=test_pub'
    /* sub - */ PUBLICATION pub1
    /* sub - */ WITH (slot_name=NONE, enabled=false, create_slot=false);
    
  • 在发布者上,使用任何名称(例如,“myslot”)手动创建一个槽。

    /* pub # */ SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput');
     slot_name |    lsn
    -----------+-----------
     myslot    | 0/1905930
    (1 row)
    
  • 在订阅者上,将订阅与刚刚创建的槽名称关联。

    /* sub # */ ALTER SUBSCRIPTION sub1 SET (slot_name='myslot');
    
  • 其余的订阅激活步骤与之前相同。

    /* sub # */ ALTER SUBSCRIPTION sub1 ENABLE;
    /* sub # */ ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
    

提交更正

如果您在文档中看到任何不正确、与您对特定功能的使用体验不符或需要进一步澄清的内容,请使用 此表格 报告文档问题。