Learn More

Syncing Stripe to Postgres

2021-07-8|⌛️8 min|acco

At Sync Inc, we replicate APIs to Postgres databases in real-time. We want to provide our customers with the experience of having direct, row-level access to their data from third-party platforms, like Stripe and Airtable.

Stripe's API is great, so we knew our Stripe support would have to be very fast and reliable on day one. In order to replace API reads, our sync needs to create a database that is truly a second source of truth. This means:

  1. We need to backfill all data: On initial sync, we load in all historical data for a given Stripe account into the target Postgres database.
  2. Support virtual objects: Stripe has a few "virtual" objects, like "upcoming invoices." These objects are in constant flux until they are created (eg an upcoming invoice becomes an invoice.) You have to fetch these one-off as there's no place to paginate them. They don't even have primary keys.
  3. We provide a /wait endpoint: As you'll see, customers can call the /wait endpoint on Sync Inc after writing changes to Stripe. This endpoint will return a 200 when we've confirmed the database is completely up-to-date. This means they can read from their database after writing to Stripe and know it's consistent.

Two primary sync strategies

Our Stripe sync orbits around Stripe's events endpoint. This endpoint serves the same purpose as a replication slot on a database. It contains a list of all create/update/delete events that have happened for a given account on Stripe.

Each event contains a full payload of the affected record. We can use this event stream to effectively playback all changes to a Stripe account.

However, as you might expect, the endpoint does not contain an unbounded list of all events in a Stripe account ever. It contains data from the last 30 days.

So, this means that when our customers start up a new replica Postgres database, we need to backfill it with historical Stripe data first. Backfilling just means paginating each and every endpoint back to the beginning of the Stripe account.

What we ended up with was two distinct sync processes: Our backfill process and our events polling process.

We run the backfill process first to build up the initial database. Then we run the events polling process continuously over the lifetime of the database to keep it in sync.

Sync process: The backfill

During the backfill, we need to paginate through the full history of each endpoint on Stripe.

Given the breadth of the Stripe API, there are a few of challenges the backfill poses:

  • We need to make requests to dozens of API endpoints.
  • Then, for each endpoint, we have to convert the JSON response into a structure that's ready to insert into Postgres.
  • Further, each response can contain several layers of nested children. Those children can be lists of children which are, in turn, paginateable.

This was a great excuse to use Elixir's Broadway. A Broadway pipeline consists of one producer and one or more workers. The producer is in charge of producing jobs. The workers consume and work those jobs, each working in parallel. Broadway gives us a few things out of the box:

  • A worker queue with back-pressure.
  • We can dynamically scale the number of workers in the pipeline based on the amount of work to do.
  • We can easily rate limit the volume of work we process per unit of time. We tuned this to stay well below Stripe's API quota limit.
  • A "message" construct with acknowledge/fail behaviors. This made things like retry logic trivial.

In our case, the queue of work the producer maintains is a list of pages to be processed. A page is the combination of an endpoint and the current cursor for that endpoint. Here's a small example:

queue = [
{"/v1/customers", "cur9sjkxi1x"},
{"/v1/invoices", "cur0pskoxiq1"},
# ...
]

To configure throughput, we just instantiate Broadway with a few parameters:

options = [
producer: [
module: BackfillProducer,
rate_limiting: [
allowed_messages: 50,
interval: :timer.seconds(1)
]
],
processors: [
default: [
concurrency: 50,
max_demand: 1
]
]
]

That rate_limiting setting is all we need to ensure we process no more than 50 pages per second. This leaves a comfy 50 requests per second left over in a customer's Stripe quota.

Under processors, we specify that we want up to 50 concurrent workers and that each may request one unit of work per time (in our case, a page).

So, to kick off the sync, the backfill producer's queue is seeded with all Stripe endpoints (and nil cursors). Our workers checkout a page to work and fetch it. Each page contains up to 100 objects. Each of those objects can contain a list of paginateable children. As such, the worker's first job is to populate all objects in the page completely.

Once we have a list of "filled out" objects, we parse and insert them. We use a large JSON object which maps object types and their fields to tables and columns in Postgres. We benefit greatly from the fact that every Stripe object contains an object field which identifies what the entity is.

Sync process: new events

After the backfill completes, it's time to switch to processing events for the indefinite lifetime of the sync. But we need a smooth hand-off between the two, otherwise we risk missing a change.

To facilitate the hand-off, before the backfill begins we make a request to /events to grab the most recent cursor. After the backfill completes, we first catch up on all /events that occurred while we were backfilling. After those are processed, the database is up-to-date. And it's time to poll /events indefinitely.

We poll the /events endpoint every 500ms to check to see if there’s anything new to process, continuously. This is how we can promise "sub-second" lag.

We log sync completions to a Postgres table. We use the "polymorphic embed" pattern, where each log entry contains a JSON payload that can take one of several shapes. For example, our "Stripe backfill complete" log looks like this:

{
"kind": "stripe_backfill_complete",
"row_count": 1830,
"last_event_before_backfill": "evt_1J286oDXGuvRIWUJKfUqKpsJ"
}

When we boot a Stripe sync process, it checks the sync logs table for the most recent completed sync for this database. Our sync manager then knows what kind of sync process we need to boot and the initial state of that process.

What about webhooks?

When one hears about a "real-time" or "evented" API integration, the first API primitive that leaps to mind is "webhooks."

But webhooks come with a few challenges:

  1. You can't go down: Senders typically retry undelivered webhooks with some exponential back-off. But the guarantees are often loose or unclear. And the last thing your system probably needs after recovering from a disaster is a deluge of backed-up webhooks to handle.
  2. You're counting on sender deliverability: When polling, the only real barrier between you and the latest data is a possible caching layer. With webhooks, senders will typically have some sort of queue or "outbox" that their workers work through. Queues like this are subject to back-pressure. This opens you up to your sync slowing down if your sender's queue backs up.
  3. A redundant system: Webhooks are not something we can rely on exclusively for a syncing operation like this, so they'll always need to be complemented by a polling system. We have to poll to backfill the database after it initializes. And we may have to poll after recovering from downtime or after fixing a bug in our webhook handling logic.

In general, I have this suspicion that a system that relies purely on webhooks to stay in sync is bound to fail. All it takes is for one webhook to get dropped, on either the receiving end or sending end. With no other backup mechanisms in place, you risk a record in your database being out of sync indefinitely.

Luckily, it turns out that with an /events endpoint to poll, webhooks are not necessary. The trick is to just poll it frequently enough to get as close to real-time as possible! What's great is that you can use the same sync system to get a change made milliseconds ago or to catch up on all changes that happened during unexpected downtime.

The "wait" endpoint

Our databases are read-only. Customers make writes to a platform's API, so that those writes can go through a platform's validation stack. Then, those changes flow down to their database. This is the one-way data flow we advocate.

To make our Stripe database a true second source of truth, we need a final pillar. We need to enable "read after writes," or the guarantee that if you make a write to Stripe's API, that write will be reflected in your database in a subsequent read. While our Stripe sync is fast, the architecture at present leaves open a race condition: You can make a write to Stripe then query your database before the change has propagated.

The simplest way to overcome this race condition is to sleep for one second before any subsequent reads. This should work almost all the time. But we wanted to provide something even more robust.

Customers can instead call our "wait" endpoint:

GET <https://api.syncinc.so/api/stripe/wait/:id>

This endpoint will hold open until we've confirmed that your Stripe database is up-to-date. When it is, the request returns with a 200. You can now make a subsequent read to your database with confidence.

Coming up

With backfills, support for virtual objects like "upcoming invoices," and sub-second sync times, we provide a true Postgres replica with all your Stripe data.

We still have a lot of work to do to make the developer experience around this database great. There are 82 tables to wrap one's head around (!!) and robust ORM support is a must for many developers. Now that the foundations of our sync are in place, stay tuned for updates to the overall experience.

Skip the API. Access your data from SaaS platforms like Airtable or Stripe without leaving your database. Syncs to Postgres in real-time. Get started free.