Improving upserts in Ecto and PostgreSQL
by Paweł Świątkowski
04 Jun 2024
Upserting is a very useful technique for working with databases. One of the most common use cases is when you are mirroring data from a different data source (different database or an API, for example) in your application’s local database. Ecto supports it quite well and has a dedicated section of its documentation about it. However, I found this can be improved even further, and I will show you how.
If you are familiar with upserts in general, you can safely skip the next three sections, where I explain what they are and what purpose they serve.
What is an upsert?
An upsert is basically “update or insert”. You try to insert a new record, but if inserting fails because of violating some constraint, you instead update the row that was the source of the violation.
In the most common case, the violation comes from the primary key. When you are mirroring the data from an external source, instead of setting the ID yourself, you reuse an ID already given by an external system.
Let’s say we consume some kind of notifications about user activity. They look like this:
{
"id": "01HZJG8DKRN38DJ40BB8MRM3Z6",
"kind": "SIGNED_IN",
"t": "2024-06-04T12:01:56Z",
"user": {
"id": "123",
"name": "John Smith",
"avatar_url": null
}
}
and then after some time
{
"id": "01HZJG8W232DFKWXWQVR4BNHC1",
"kind": "PROFILE_UPDATED",
"t": "2024-06-04T12:32:16Z",
"user": {
"id": "123",
"name": "John Smith",
"avatar_url": "https://avata.rs/123.jpg"
}
}
We want to save the notification, but we don’t want to attach user information to each one of them. This is a waste of disk space, and also we are having stale data. Instead, we re-normalize this a bit by having activities
table with id
, kind
, occurred_at
and user_id
and users
table with id
, name
and avatar_url
. Every time we process a notification, we insert a new record to activities
table but upsert the record in users
table
What problem does it solve?
Why cannot we just implement it naively like this?
def upsert_user(attrs) do
case Repo.get(User, attrs.id) do
nil ->
%User{}
|> User.changeset(attrs)
|> Repo.insert!()
user ->
user
|> User.changeset(attrs)
|> Repo.insert!()
end
end
This will work. For some time. Until it won’t.
When you start to get higher traffic, you will probably encounter a situation, when you receive two notifications about the same user pretty much at the same time. If you receive them via a webhook, they will be processed asynchronously. What will inevitably happen is this:
[process 1]: Repo.get(User, 120) -> nil
[process 2]: Repo.get(User, 120) -> nil
[process 1]: Repo.insert!(%User{id: 120, ...}) -> :ok
[process 2]: Repo.insert!(%User{id: 120, ...}) -> Exception! ID 120 is already present
It’s a classic race condition and upserts exist exactly to avoid this issue.
How is this solved?
In Ecto and PostgreSQL upserts are done using on_conflict
option to Repo.insert!
. In our case it would look like this:
def upsert_user(params) do
%User{}
|> User.changeset(params)
|> Repo.insert!(
conflict_target: [:id],
on_conflict: {:replace_all_except, [:inserted_at]}
)
end
The code is pretty descriptive, but just in case in plain English this could be expressed as:
Try to insert a record using these values. If this fails because of a constraint on a id
column, replace all values of the record causing the constraint violations, except inserted_at
(we want to keep the original one).
This works nicely:
iex> Activities.upsert_user(%{id: 12, name: "John Smith", avatar_url: nil})
%Upsert.Activities.User{
__meta__: #Ecto.Schema.Metadata<:loaded, "users">,
id: 12,
name: "John Smith",
avatar_url: nil,
inserted_at: ~U[2024-06-04 20:17:03Z],
updated_at: ~U[2024-06-04 20:17:03Z]
}
iex> Activities.upsert_user(%{id: 12, name: "John Smith", avatar_url: "https://avata.rs/123.jpg"})
%Upsert.Activities.User{
__meta__: #Ecto.Schema.Metadata<:loaded, "users">,
id: 12,
name: "John Smith",
avatar_url: "https://avata.rs/123.jpg",
inserted_at: ~U[2024-06-04 20:17:09Z],
updated_at: ~U[2024-06-04 20:17:09Z]
}
iex> Repo.aggregate(User, :count)
1
How to improve this?
This code works great for us for months. But then, due to things happening, a new requirement was born. Now you need to emit an event to your event bus, saying that either :user_created
or :user_updated
.
The problem is that with the upsert… we cannot really tell. We are just commanding the database to “do its thing” and we don’t know what it did.
We could use a trick: fetch a record we just upserted and check if inserted_at == updated_at
. This would probably work, but perhaps there is a different way?
Of course there is. Otherwise, I wouldn’t be writing this post. Admittedly, it is a little bit hacky. It requires modifying a schema to add an intimate PostgreSQL internal details, so especially people who treat their schemas as domain entities will probably not like this. However, the overall result has at least some educational value, I think.
Let’s start by adding a xmax
field to our schema. But as it’s a bit special one, we would not want it to be read unless explicitly requested.
field :xmax, :integer, load_in_query: false
What is xmax
? It is one of PostgreSQL’s system columns. As we can read in the documentation, xmax
is the ID of the transaction that deleted the row or zero if the row version was not deleted. But what deletion has to do with anything? As it happens, in PostgreSQL due to its MVCC model (Multi Version Concurrency Control) an update is actually a deletion and insert of a new version. This will change the xmax
to a non-zero value, which is very fortunate for us! We just need to return it.
def upsert_user(params) do
%User{}
|> User.changeset(params)
|> Repo.insert!(
conflict_target: [:id],
on_conflict: {:replace_all_except, [:inserted_at, :xmax]},
returning: [:id, :xmax]
)
end
Let’s try this out:
iex> Activities.upsert_user(%{id: 17, name: "Jane Doe", avatar_url: "https://avatar.url/123"})
%Upsert.Activities.User{
__meta__: #Ecto.Schema.Metadata<:loaded, "users">,
id: 17,
name: "Jane Doe",
avatar_url: "https://avatar.url/123",
xmax: 0,
inserted_at: ~U[2024-06-04 21:37:56Z],
updated_at: ~U[2024-06-04 21:37:56Z]
}
Here xmax
is zero, which means that it was an insert. Let’s run it again:
iex> Activities.upsert_user(%{id: 17, name: "Jane Doe", avatar_url: "https://avatar.url/123"})
%Upsert.Activities.User{
__meta__: #Ecto.Schema.Metadata<:loaded, "users">,
id: 17,
name: "Jane Doe",
avatar_url: "https://avatar.url/123",
xmax: 137646,
inserted_at: ~U[2024-06-04 21:38:00Z],
updated_at: ~U[2024-06-04 21:38:00Z]
}
It’s non-zero, so it was upserted. So the last part is to use this information:
def upsert_user(params) do
%User{}
|> User.changeset(params)
|> Repo.insert!(
conflict_target: [:id],
on_conflict: {:replace_all_except, [:inserted_at, :xmax]},
returning: [:id, :xmax]
)
|> case do
%{xmax: 0} -> :inserted
_ -> :updated
end
end
iex> Activities.upsert_user(%{id: 27, name: "Mark Twain"})
:inserted
iex> Activities.upsert_user(%{id: 27, name: "Mark Twain"})
:updated
Which is what we needed. With a little bit of a secret database sauce, we managed to get the information about what our upsert did without resorting to tricks like comparing automatic timestamps at a price of a schema being a bit less domain-pure and a bit more coupled to the database.
This is a totally acceptable trade-off in my opinion. If you want clean domain, you should convert Ecto schemas into purer structs, on which you cannot call the Repo
functions such as preload
and which do not have implementation details such as Ecto.Association.NotLoaded
.
This is, however, a whole different discussion.