Apache Paimon Primary Key 表

创建表时,默认的表类型是 Changelog 表。用户可以在该表中插入、更新或删除记录。

主键由一组包含每个记录唯一值的列组成。Paimon 通过在每个 Bucket 内对主键进行排序来强制执行数据排序,使用户能够通过在主键上应用过滤条件来实现高性能。

通过在 Changelog 表上定义主键,用户可以获得以下功能。

Bucket

Bucket 是读写的最小存储单元,每个 Bucket 目录包含一个 LSM 树。

Fixed Bucket

配置一个大于 0 的 Bucket 数量,对 Bucket 进行重新划分只能通过离线方式处理,参见Rescale Bucket,过多的 Bucket 会导致过多的小文件,而过少的 Bucket 会导致写入性能下降。

Dynamic Bucket

配置 'bucket' = '-1'的话,Paimon 会动态维护索引,自动扩展 Bucket 的数量。

  • dynamic-bucket.target-row-num:控制一个 Bucket 的目标行数。
  • dynamic-bucket.assigner-parallelism:分配器操作符的并行度,控制初始化 Bucket 的数量。

注:动态 Bucket 只支持单个写入作业。请不要启动多个作业同时写入同一个分区。

普通的动态 Bucket 模式

当你的更新操作不跨越分区(没有分区,或者主键包含所有分区字段)时,动态 Bucket 模式使用哈希索引来维护从键到 Bucket 的映射关系,它需要比固定 Bucket 模式更多的内存。
性能:

  • 一般而言,性能不会有损失,但会有一些额外的内存消耗。每个分区中的 1 亿条记录会增加约 1 GB 的内存消耗,不再活跃的分区不会占用内存。
  • 对于更新频率较低的表,建议使用此模式来显著提高性能。

普通的动态 Bucket 模式的排序压缩

普通动态 Bucket 模式支持排序压缩以加速查询。

跨分区 Upsert 动态 Bucket 模式

这是一个实验性的功能。

当需要进行跨分区的 Upsert 操作(主键不包含所有分区字段)时,动态 Bucket 模式直接维护键到分区和 Bucket 的映射关系,使用本地磁盘,并在启动流式写入作业时通过读取表中所有现有键来初始化索引。不同的合并引擎具有不同的行为:

  • Deduplicate:从旧的分区删除数据,并将新数据插入到新的分区中。
  • PartialUpdate & Aggregation:将新数据插入到旧的分区中。
  • FirstRow:如果存在旧值,则忽略新数据。

性能:对于数据量较大的表,性能会有显著损失。此外,初始化过程需要很长时间。

如果 Upsert 操作不依赖于过旧的数据,可以考虑配置索引的 TTL 以减少索引和初始化时间:
cross-partition-upsert.index-ttl:RocksDB 索引和初始化的 TTL,这可以避免维护过多的索引并导致性能逐渐变差。但请注意,这也可能会导致数据重复。

Merge Engines

当 Paimon Sink 接收到两条或者两条以上相同主键的数据,它会将它们合并成一条保证主键唯一,通过指定表的 merge-engine 属性,用户可以选择如何将数据合并在一起。

注:在 Flink SQL TableConfig 中始终将 table.exec.sink.upsert-materialize 设置为 NONE,因为启用 sink upsert-materialize 可能会导致奇怪的行为。当输入数据无序时,建议使用序列字段来纠正无序性。

Deduplicate

Deduplicate 是默认的 merge engine。Paimon 只会保留最新的记录,并丢弃具有相同主键的其他记录。
具体而言,如果最新的记录是一个 DELETE 记录,那么所有具有相同主键的记录都将被删除。

Partial Update

通过指定 ‘merge-engine’ = ‘partial-update’,用户可以通过多次更新来更新记录的列,直到记录完整为止。这是通过逐个更新值字段,使用相同主键下的最新数据来实现的。但在这个过程中,空值不会被覆盖。
例如,假设 Paimon 收到三条记录:

<1, 23.0, 10, NULL>

<1, NULL, NULL, 'This is a book'>

<1, 25.2, NULL, NULL>

假设第一列是主键,最终的结果将是 <1, 25.2, 10, 'This is a book'>。

对于流式查询,partial-update 合并引擎必须与查找(lookup)或完全合并(full-compaction)的 changelog producer 一起使用。(’input’ changelog producer 也支持,但只返回输入记录。)

默认情况下,部分更新不接受删除记录,可以选择以下解决方案之一:

  • 配置 ‘partial-update.ignore-delete’ 以忽略删除记录。
  • 配置 ‘sequence-group’ 来撤销部分列。

Sequence Group

一个序列字段可能无法解决具有多个流式更新的 partial-update 表的乱序问题,因为在多流更新期间,序列字段可能会被另一个流的最新数据覆盖。

因此,我们为 partial-update 表引入了序列组机制。它可以解决以下问题:

  • 多流更新期间的乱序问题。每个流定义自己的序列组。
  • 真正的 partial-update,而不仅仅是非空更新。

请参考以下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CREATE TABLE T (
k INT,
a INT,
b INT,
g_1 INT,
c INT,
d INT,
g_2 INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update',
'fields.g_1.sequence-group'='a,b',
'fields.g_2.sequence-group'='c,d'
);

INSERT INTO T VALUES (1, 1, 1, 1, 1, 1, 1);

-- g_2 is null, c, d should not be updated
INSERT INTO T VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));

SELECT * FROM T; -- output 1, 2, 2, 2, 1, 1, 1

-- g_1 is smaller, a, b should not be updated
INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);

SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3

对于字段 sequence-group,有效的比较数据类型包括:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP 和 TIMESTAMP_LTZ。

Default Value

如果无法保证数据的顺序,并且字段仅通过覆盖 null 进行写入,那么在读取表时,未被覆盖的字段将显示为 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE T (
k INT,
a INT,
b INT,
c INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update'
);
INSERT INTO T VALUES (1, 1,null,null);
INSERT INTO T VALUES (1, null,null,1);

SELECT * FROM T; -- output 1, 1, null, 1

如果期望在读取表时,未被覆盖的字段具有默认值而不是 null,则需要使用 ‘fields.name.default-value’。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE T (
k INT,
a INT,
b INT,
c INT,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine'='partial-update',
'fields.b.default-value'='0'
);

INSERT INTO T VALUES (1, 1,null,null);
INSERT INTO T VALUES (1, null,null,1);

SELECT * FROM T; -- output 1, 1, 0, 1

Aggregation

注意:在 Flink SQL TableConfig 中,始终将 table.exec.sink.upsert-materialize 设置为 NONE。
有时用户只关心聚合结果。aggregation 合并引擎根据聚合函数,逐个将每个值字段与相同主键下的最新数据进行聚合。

非主键字段可以指定聚合函数,通过 fields..aggregate-function 表属性来指定,否则将默认使用 last_non_null_value 聚合。例如,考虑以下表定义。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. Bucket
    1. 1.1. Fixed Bucket
    2. 1.2. Dynamic Bucket
      1. 1.2.1. 普通的动态 Bucket 模式
      2. 1.2.2. 普通的动态 Bucket 模式的排序压缩
      3. 1.2.3. 跨分区 Upsert 动态 Bucket 模式
  2. 2. Merge Engines
    1. 2.1. Deduplicate
    2. 2.2. Partial Update
      1. 2.2.1. Sequence Group
      2. 2.2.2. Default Value
    3. 2.3. Aggregation