数据迁移
迁移一般用来变更数据库模式,但在有些情况下,需要修改数据库中的数据。 例如,添加种子数据或使用自定义默认值回填空列。
这种类型的迁移称之为数据迁移。在本文档中,我们将会探讨如何使用 Ent 来计划数据迁移,并将它们集成到常规的迁移工作流中。
迁移类型
Ent 目前支持两种迁移,版本化迁移 和 声明式迁移 (自动化迁移)。数据迁移在这两种迁移中都可以运行。
版本化迁移
使用版本化迁移时,数据迁移需存储在相同的 migrations 目录中并以常规迁移同样的方式执行。
建议将数据迁移和模式迁移分布存储在不同的文件中以便测试。
迁移使用的格式是 SQL,即使 Ent 模式被修改导致生成的代码与数据迁移文件不再兼容的情况下,数据迁移文件也可以安全执行(且无需修改存储内容)。
创建数据迁移脚本有两种方式:手动编写和自动生成。通过手动编辑,用户可编写所有 SQL 语句并精确控制执行内容。 另一种方式是使用 Ent 自动生成数据迁移脚本。建议验证生成的文件是否正确,因为某些情况下可能需要手动修正或编辑。
手动创建
1. 如何你没有安装 Atlas,请参阅 快速开始 指南。
2. 使用 Atlas 创建新的迁移文件:
atlas migrate new <migration_name> \
--dir "file://my/project/migrations"
3. 编辑迁移文件并添加自定义数据。例如:
-- Backfill NULL or null tags with a default value.
UPDATE `users` SET `tags` = '["foo","bar"]' WHERE `tags` IS NULL OR JSON_CONTAINS(`tags`, 'null', '$');
4. 更新迁移目录下的 完整性文件:
atlas migrate hash \
--dir "file://my/project/migrations"
如果你不确定如何测试数据迁移文件请参见后面的 测试 部分。
生成脚本
目前,Ent 提供了生成数据迁移文件的初步支持。 通过使用此选项,用户在多数情况下可简化手动编写复杂 SQL 语句的过程。 但仍建议验证生成的文件是否正确,因为在某些特殊情况下可能需要手动编辑。
1. 创建 版本化迁移配置,如何尚未设置的话。
2. 创建第一个数据迁移函数。你可以在下面找到如何写此函数的演示:
- Single Statement
- Multi Statement
- Data Seeding
package migratedata
// BackfillUnknown back-fills all empty users' names with the default value 'Unknown'.
func BackfillUnknown(dir *migrate.LocalDir) error {
w := &schema.DirWriter{Dir: dir}
client := ent.NewClient(ent.Driver(schema.NewWriteDriver(dialect.MySQL, w)))
// Change all empty names to 'unknown'.
err := client.User.
Update().
Where(
user.NameEQ(""),
).
SetName("Unknown").
Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating statement: %w", err)
}
// Write the content to the migration directory.
return w.FlushChange(
"unknown_names",
"Backfill all empty user names with default value 'unknown'.",
)
}
然后使用 ent/migrate/main.go 中使用此函数生成下面的迁移文件:
-- Backfill all empty user names with default value 'unknown'.
UPDATE `users` SET `name` = 'Unknown' WHERE `users`.`name` = '';
package migratedata
// BackfillUserTags is used to generate the migration file '20221126185750_backfill_user_tags.sql'.
func BackfillUserTags(dir *migrate.LocalDir) error {
w := &schema.DirWriter{Dir: dir}
client := ent.NewClient(ent.Driver(schema.NewWriteDriver(dialect.MySQL, w)))
// Add defaults "foo" and "bar" tags for users without any.
err := client.User.
Update().
Where(func(s *sql.Selector) {
s.Where(
sql.Or(
sql.IsNull(user.FieldTags),
sqljson.ValueIsNull(user.FieldTags),
),
)
}).
SetTags([]string{"foo", "bar"}).
Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating backfill statement: %w", err)
}
// Document all changes until now with a custom comment.
w.Change("Backfill NULL or null tags with a default value.")
// Append the "org" special tag for users with a specific prefix or suffix.
err = client.User.
Update().
Where(
user.Or(
user.NameHasPrefix("org-"),
user.NameHasSuffix("-org"),
),
// Append to only those without this tag.
func(s *sql.Selector) {
s.Where(
sql.Not(sqljson.ValueContains(user.FieldTags, "org")),
)
},
).
AppendTags([]string{"org"}).
Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating backfill statement: %w", err)
}
// Document all changes until now with a custom comment.
w.Change("Append the 'org' tag for organization accounts in case they don't have it.")
// Write the content to the migration directory.
return w.Flush("backfill_user_tags")
}
然后使用 ent/migrate/main.go 中使用此函数生成下面的迁移文件:
-- Backfill NULL or null tags with a default value.
UPDATE `users` SET `tags` = '["foo","bar"]' WHERE `tags` IS NULL OR JSON_CONTAINS(`tags`, 'null', '$');
-- Append the 'org' tag for organization accounts in case they don't have it.
UPDATE `users` SET `tags` = CASE WHEN (JSON_TYPE(JSON_EXTRACT(`tags`, '$')) IS NULL OR JSON_TYPE(JSON_EXTRACT(`tags`, '$')) = 'NULL') THEN JSON_ARRAY('org') ELSE JSON_ARRAY_APPEND(`tags`, '$', 'org') END WHERE (`users`.`name` LIKE 'org-%' OR `users`.`name` LIKE '%-org') AND (NOT (JSON_CONTAINS(`tags`, '"org"', '$') = 1));
package migratedata
// SeedUsers add the initial users to the database.
func SeedUsers(dir *migrate.LocalDir) error {
w := &schema.DirWriter{Dir: dir}
client := ent.NewClient(ent.Driver(schema.NewWriteDriver(dialect.MySQL, w)))
// The statement that generates the INSERT statement.
err := client.User.CreateBulk(
client.User.Create().SetName("a8m").SetAge(1).SetTags([]string{"foo"}),
client.User.Create().SetName("nati").SetAge(1).SetTags([]string{"bar"}),
).Exec(context.Background())
if err != nil {
return fmt.Errorf("failed generating statement: %w", err)
}
// Write the content to the migration directory.
return w.FlushChange(
"seed_users",
"Add the initial users to the database.",
)
}
然后使用 ent/migrate/main.go 中使用此函数生成下面的迁移文件:
-- Add the initial users to the database.
INSERT INTO `users` (`age`, `name`, `tags`) VALUES (1, 'a8m', '["foo"]'), (1, 'nati', '["bar"]');
3. 为使生成的文件也被编辑,迁移目录 完整性文件 需用以下命令更新:
atlas migrate hash \
--dir "file://my/project/migrations"
测试
添加迁移文件后,强烈建议你在本地数据库上应用这些文件,以确保其有效性并达到预期效果。以下流程可手动执行,也可通过程序实现自动化操作。
1. 执行所有迁移文件直至最后创建的文件——数据迁移文件:
# Total number of files.
number_of_files=$(ls ent/migrate/migrations/*.sql | wc -l)
# Execute all files without the latest.
atlas migrate apply $[number_of_files-1] \
--dir "file://my/project/migrations" \
-u "mysql://root:pass@localhost:3306/test"
2. 确保最后一个迁移文件处于待执行状态:
atlas migrate status \
--dir "file://my/project/migrations" \
-u "mysql://root:pass@localhost:3306/test"
Migration Status: PENDING
-- Current Version: <VERSION_N-1>
-- Next Version: <VERSION_N>
-- Executed Files: <N-1>
-- Pending Files: 1
3. 在运行数据迁移文件之前,用代表生产数据库的临时数据填充本地数据库。
4. 运行 atlas migrate apply 并确认成功执行。
atlas migrate apply \
--dir "file://my/project/migrations" \
-u "mysql://root:pass@localhost:3306/test"
注意使用 atlas schema clean 可以清空本地开发数据库并重复这一过程,直到数据迁移文件得到想要的结果。
自动化迁移
在声明式工作流中,数据迁移通过 Diff 或 Apply 钩子 实现。
这是因为与版本化选项不同,此类迁移在应用时不保留名称或版本信息。
因此,当使用钩子写入数据时,必须在执行前检查 schema.Change 的类型,以确保数据迁移未被重复应用。
func FillNullValues(dbdialect string) schema.ApplyHook {
return func(next schema.Applier) schema.Applier {
return schema.ApplyFunc(func(ctx context.Context, conn dialect.ExecQuerier, plan *migrate.Plan) error {
// Search the schema.Change that triggers the data migration.
hasC := func() bool {
for _, c := range plan.Changes {
m, ok := c.Source.(*schema.ModifyTable)
if ok && m.T.Name == user.Table && schema.Changes(m.Changes).IndexModifyColumn(user.FieldName) != -1 {
return true
}
}
return false
}()
// Change was found, apply the data migration.
if hasC {
// At this stage, there are three ways to UPDATE the NULL values to "Unknown".
// Append a custom migrate.Change to migrate.Plan, execute an SQL statement
// directly on the dialect.ExecQuerier, or use the generated ent.Client.
// Create a temporary client from the migration connection.
client := ent.NewClient(
ent.Driver(sql.NewDriver(dbdialect, sql.Conn{ExecQuerier: conn.(*sql.Tx)})),
)
if err := client.User.
Update().
SetName("Unknown").
Where(user.NameIsNil()).
Exec(ctx); err != nil {
return err
}
}
return next.Apply(ctx, conn, plan)
})
}
}
更多示例, Apply 钩子 示例部分。