跳到主要内容

在 Ent 模式中使用数据库触发器

触发器在关系型数据库中是比较有用的工具,可以允许你在特定时间发生时在表上执行自定义代码。 例如,当突变在不同的表上应用时触发器可以自动填充审计日志。 这样我们就能确认所有变更(包括其他程序所做变更)都可以被详细记录下来,实现数据库层面的强制执行并减少应用中的额外代码。

本指南说明如何在 Ent 类型(对象)中添加触发器,并配置模式迁移,以便使用 Atlas 将 触发器和 Ent 模式作为单一迁移单元进行管理。

Atlas 只为专业用户提供 触发器支持,使用这些功能需运行:

atlas login

安装 Atlas

要安装Atlas的最新版本,只需在终端中运行以下任一命令,或访问Atlas 官方网站

curl -sSf https://atlasgo.sh | sh

登录 Atlas

$ atlas login a8m
You are now connected to "a8m" on Atlas Cloud.

复合模式

ent/schema 包主要用来定义 Ent 类型(对象),包括字段、边和逻辑等。 表触发器或任何其他数据库原生对象在 Ent 模式中没有相应的表达。 触发器函数可以定义一次,并在不同表中的多个触发器中使用。

为扩展 PostgreSQL 模式以包含 Ent 类型和他们的触发器,可以配置 Atlas 来读取 复合模式 数据源的模式状态。 跟着以下步骤来配置你的项目:

1. 定义一个简单的模式,具有两个类型(表):usersuser_audit_logs

ent/schema/user.go
// User holds the schema definition for the User entity.
type User struct {
ent.Schema
}

// Fields of the User.
func (User) Fields() []ent.Field {
return []ent.Field{
field.String("name"),
}
}

// UserAuditLog holds the schema definition for the UserAuditLog entity.
type UserAuditLog struct {
ent.Schema
}

// Fields of the UserAuditLog.
func (UserAuditLog) Fields() []ent.Field {
return []ent.Field{
field.String("operation_type"),
field.String("operation_time"),
field.String("old_value").
Optional(),
field.String("new_value").
Optional(),
}
}

现在假设我们想要记录 users 表的每一次变更日志并将其保存到 user_audit_logs 表。 为此我们需要在 INSERTUPDATEDELETE 操作上创建触发器函数并将其添加到 users 表。

2. 下一步我们定义一个触发器函数(audit_users_changes)并使用 CREATE TRIGGER 命令将其添加到 users

schema.sql
-- Function to audit changes in the users table.
CREATE OR REPLACE FUNCTION audit_users_changes()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Trigger for INSERT operations.
CREATE TRIGGER users_insert_audit AFTER INSERT ON users FOR EACH ROW EXECUTE FUNCTION audit_users_changes();

-- Trigger for UPDATE operations.
CREATE TRIGGER users_update_audit AFTER UPDATE ON users FOR EACH ROW EXECUTE FUNCTION audit_users_changes();

-- Trigger for DELETE operations.
CREATE TRIGGER users_delete_audit AFTER DELETE ON users FOR EACH ROW EXECUTE FUNCTION audit_users_changes();

3. 最后我们创建一个简单的 atlas.hcl 配置文件,配置文件具有包含在 schema.sql 和 Ent 模式中定义的自定义触发器的 composite_schema

atlas.hcl
data "composite_schema" "app" {
# Load the ent schema first with all tables.
schema "public" {
url = "ent://ent/schema"
}
# Then, load the triggers schema.
schema "public" {
url = "file://schema.sql"
}
}

env "local" {
src = data.composite_schema.app.url
dev = "docker://postgres/15/dev?search_path=public"
}

使用

设置完复合模式后,我们可以使用 atlas schema inspect 命令来查看其表示、生成迁移或将它们应用到数据库等。以下是几个帮助你开始使用 Atlas 的几个命令:

检查模式

atlas schema inspect 命令通常用来检查数据库。然而我们也可以用它来检查 composite_schema 并打印其 SQL 表示形式:

atlas schema inspect \
--env local \
--url env://src \
--format '{{ sql . }}'

以上命令打印下述 SQL。注意 audit_users_changes 函数和触发器在 usersuser_audit_logs 表之后定义:

-- Create "user_audit_logs" table
CREATE TABLE "user_audit_logs" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "operation_type" character varying NOT NULL, "operation_time" character varying NOT NULL, "old_value" character varying NULL, "new_value" character varying NULL, PRIMARY KEY ("id"));
-- Create "users" table
CREATE TABLE "users" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "name" character varying NOT NULL, PRIMARY KEY ("id"));
-- Create "audit_users_changes" function
CREATE FUNCTION "audit_users_changes" () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- Create trigger "users_delete_audit"
CREATE TRIGGER "users_delete_audit" AFTER DELETE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_insert_audit"
CREATE TRIGGER "users_insert_audit" AFTER INSERT ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_update_audit"
CREATE TRIGGER "users_update_audit" AFTER UPDATE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();

为模式生成迁移

运行以下命令为模式生成迁移:

atlas migrate diff \
--env local

注意可生成以下内容的新的迁移文件:

migrations/20240712090543.sql
-- Create "user_audit_logs" table
CREATE TABLE "user_audit_logs" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "operation_type" character varying NOT NULL, "operation_time" character varying NOT NULL, "old_value" character varying NULL, "new_value" character varying NULL, PRIMARY KEY ("id"));
-- Create "users" table
CREATE TABLE "users" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "name" character varying NOT NULL, PRIMARY KEY ("id"));
-- Create "audit_users_changes" function
CREATE FUNCTION "audit_users_changes" () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- Create trigger "users_delete_audit"
CREATE TRIGGER "users_delete_audit" AFTER DELETE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_insert_audit"
CREATE TRIGGER "users_insert_audit" AFTER INSERT ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_update_audit"
CREATE TRIGGER "users_update_audit" AFTER UPDATE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();

应用迁移

运行以下命令将生成的迁移应用到数据库:

atlas migrate apply \
--env local \
--url "postgres://postgres:pass@localhost:5432/database?search_path=public&sslmode=disable"
将模式直接应用于数据库

有时需要在不生成迁移文件的时候将模式直接应用于数据库。例如,尝试模式变更、创建测试数据库等。这种情况下,可以使用下面的命令将模式直接应用于数据库:

atlas schema apply \
--env local \
--url "postgres://postgres:pass@localhost:5432/database?search_path=public&sslmode=disable"

或使用 Atlas Go SDK

ac, err := atlasexec.NewClient(".", "atlas")
if err != nil {
log.Fatalf("failed to initialize client: %w", err)
}
// Automatically update the database with the desired schema.
// Another option, is to use 'migrate apply' or 'schema apply' manually.
if _, err := ac.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
Env: "local",
URL: "postgres://postgres:pass@localhost:5432/database?search_path=public&sslmode=disable",
AutoApprove: true,
}); err != nil {
log.Fatalf("failed to apply schema changes: %w", err)
}

本指南的代码参见 GitHub