在 Ent 模式中使用数据库触发器
触发器在关系型数据库中是比较有用的工具,可以允许你在特定时间发生时在表上执行自定义代码。 例如,当突变在不同的表上应用时触发器可以自动填充审计日志。 这样我们就能确认所有变更(包括其他程序所做变更)都可以被详细记录下来,实现数据库层面的强制执行并减少应用中的额外代码。
本指南说明如何在 Ent 类型(对象)中添加触发器,并配置模式迁移,以便使用 Atlas 将 触发器和 Ent 模式作为单一迁移单元进行管理。
Atlas 只为专业用户提供 触发器支持,使用这些功能需运行:
atlas login
安装 Atlas
要安装Atlas的最新版本,只需在终端中运行以下任一命令,或访问Atlas 官方网站:
- macOS + Linux
- Homebrew
- Docker
- Windows
curl -sSf https://atlasgo.sh | sh
brew install ariga/tap/atlas
docker pull arigaio/atlas
docker run --rm arigaio/atlas --help
如果容器需要访问主机网络或本地目录,请使用 --net=host 标志挂载所需目录:
docker run --rm --net=host \
-v $(pwd)/migrations:/migrations \
arigaio/atlas migrate apply
--url "mysql://root:pass@:3306/test"
下载 最新版本 并将 atlas 二进制执行文件所在目录加入到系统路径中。
登录 Atlas
$ atlas login a8m
You are now connected to "a8m" on Atlas Cloud.
复合模式
ent/schema 包主要用来定义 Ent 类型(对象),包括字段、边和逻辑等。
表触发器或任何其他数据库原生对象在 Ent 模式中没有相应的表达。
触发器函数可以定义一次,并在不同表中的多个触发器中使用。
为扩展 PostgreSQL 模式以包含 Ent 类型和他们的触发器,可以配置 Atlas 来读取 复合模式 数据源的模式状态。 跟着以下步骤来配置你的项目:
1. 定义一个简单的模式,具有两个类型(表):users 和 user_audit_logs:
// User holds the schema definition for the User entity.
type User struct {
ent.Schema
}
// Fields of the User.
func (User) Fields() []ent.Field {
return []ent.Field{
field.String("name"),
}
}
// UserAuditLog holds the schema definition for the UserAuditLog entity.
type UserAuditLog struct {
ent.Schema
}
// Fields of the UserAuditLog.
func (UserAuditLog) Fields() []ent.Field {
return []ent.Field{
field.String("operation_type"),
field.String("operation_time"),
field.String("old_value").
Optional(),
field.String("new_value").
Optional(),
}
}
现在假设我们想要记录 users 表的每一次变更日志并将其保存到 user_audit_logs 表。
为此我们需要在 INSERT、 UPDATE 和 DELETE 操作上创建触发器函数并将其添加到 users 表。
2. 下一步我们定义一个触发器函数(audit_users_changes)并使用 CREATE TRIGGER 命令将其添加到 users:
-- Function to audit changes in the users table.
CREATE OR REPLACE FUNCTION audit_users_changes()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Trigger for INSERT operations.
CREATE TRIGGER users_insert_audit AFTER INSERT ON users FOR EACH ROW EXECUTE FUNCTION audit_users_changes();
-- Trigger for UPDATE operations.
CREATE TRIGGER users_update_audit AFTER UPDATE ON users FOR EACH ROW EXECUTE FUNCTION audit_users_changes();
-- Trigger for DELETE operations.
CREATE TRIGGER users_delete_audit AFTER DELETE ON users FOR EACH ROW EXECUTE FUNCTION audit_users_changes();
3. 最后我们创建一个简单的 atlas.hcl 配置文件,配置文件具有包含在 schema.sql 和 Ent 模式中定义的自定义触发器的 composite_schema:
data "composite_schema" "app" {
# Load the ent schema first with all tables.
schema "public" {
url = "ent://ent/schema"
}
# Then, load the triggers schema.
schema "public" {
url = "file://schema.sql"
}
}
env "local" {
src = data.composite_schema.app.url
dev = "docker://postgres/15/dev?search_path=public"
}
使用
设置完复合模式后,我们可以使用 atlas schema inspect 命令来查看其表示、生成迁移或将它们应用到数据库等。以下是几个帮助你开始使用 Atlas 的几个命令:
检查模式
atlas schema inspect 命令通常用来检查数据库。然而我们也可以用它来检查 composite_schema 并打印其 SQL 表示形式:
atlas schema inspect \
--env local \
--url env://src \
--format '{{ sql . }}'
以上命令打印下述 SQL。注意 audit_users_changes 函数和触发器在 users 和 user_audit_logs 表之后定义:
-- Create "user_audit_logs" table
CREATE TABLE "user_audit_logs" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "operation_type" character varying NOT NULL, "operation_time" character varying NOT NULL, "old_value" character varying NULL, "new_value" character varying NULL, PRIMARY KEY ("id"));
-- Create "users" table
CREATE TABLE "users" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "name" character varying NOT NULL, PRIMARY KEY ("id"));
-- Create "audit_users_changes" function
CREATE FUNCTION "audit_users_changes" () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- Create trigger "users_delete_audit"
CREATE TRIGGER "users_delete_audit" AFTER DELETE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_insert_audit"
CREATE TRIGGER "users_insert_audit" AFTER INSERT ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_update_audit"
CREATE TRIGGER "users_update_audit" AFTER UPDATE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
为模式生成迁移
运行以下命令为模式生成迁移:
atlas migrate diff \
--env local
注意可生成以下内容的新的迁移文件:
-- Create "user_audit_logs" table
CREATE TABLE "user_audit_logs" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "operation_type" character varying NOT NULL, "operation_time" character varying NOT NULL, "old_value" character varying NULL, "new_value" character varying NULL, PRIMARY KEY ("id"));
-- Create "users" table
CREATE TABLE "users" ("id" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY, "name" character varying NOT NULL, PRIMARY KEY ("id"));
-- Create "audit_users_changes" function
CREATE FUNCTION "audit_users_changes" () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value, new_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO user_audit_logs(operation_type, operation_time, old_value)
VALUES (TG_OP, CURRENT_TIMESTAMP, row_to_json(OLD));
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- Create trigger "users_delete_audit"
CREATE TRIGGER "users_delete_audit" AFTER DELETE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_insert_audit"
CREATE TRIGGER "users_insert_audit" AFTER INSERT ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
-- Create trigger "users_update_audit"
CREATE TRIGGER "users_update_audit" AFTER UPDATE ON "users" FOR EACH ROW EXECUTE FUNCTION "audit_users_changes"();
应用迁移
运行以下命令将生成的迁移应用到数据库:
atlas migrate apply \
--env local \
--url "postgres://postgres:pass@localhost:5432/database?search_path=public&sslmode=disable"
有时需要在不生成迁移文件的时候将模式直接应用于数据库。例如,尝试模式变更、创建测试数据库等。这种情况下,可以使用下面的命令将模式直接应用于数据库:
atlas schema apply \
--env local \
--url "postgres://postgres:pass@localhost:5432/database?search_path=public&sslmode=disable"
或使用 Atlas Go SDK:
ac, err := atlasexec.NewClient(".", "atlas")
if err != nil {
log.Fatalf("failed to initialize client: %w", err)
}
// Automatically update the database with the desired schema.
// Another option, is to use 'migrate apply' or 'schema apply' manually.
if _, err := ac.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
Env: "local",
URL: "postgres://postgres:pass@localhost:5432/database?search_path=public&sslmode=disable",
AutoApprove: true,
}); err != nil {
log.Fatalf("failed to apply schema changes: %w", err)
}
本指南的代码参见 GitHub。