0

Designing Fault-Tolerant AI Pipelines

Learn how to build resilient multimedia processing pipelines with Elixir and Oban

When I was working on Momento Baby, an AI search engine that allows one to leverage natural language to search for images and videos, I had a problem: How do you process images and videos in a resilient and fault-tolerant way?

Processing multimedia is complex and expensive. So, I followed one of the main scalability principles I discussed in my article about keeping apps scalable with the Big-O mindset.

The naive solution would be to process everything in a single function, iterating over the list of images one imported. However, I immediately thought about processing these images asynchronously. That's the scalable mindset!

If you are an Elixir engineer, you certainly have heard about Oban. Oban is one of the main tools from the Elixir ecosystem and it allows one to run asynchronous tasks in a simple and organized way.

So, my first attempt followed this workflow:

  • A user imported multiple images
  • For each image, I created an Oban job
  • I let Oban process items from the queue

Each job had the following responsibilities:

  1. Download the image
  2. Generate a thumbnail
  3. Call the OpenAI Vision API to turn the image into structured JSON
  4. Embed that JSON
  5. Save the embeddings into a Postgres database

Another thing that was clear to me was that I needed a transaction so that if anything from the process above failed, I could retry the job as long as necessary.

Locally, it worked perfectly. However, when I took this code to production, I immediately faced problems when I imported 40 images in a single shot. Luckily, I was still stressing the system to see where it would fail.

What Happened

If you have followed my previous posts, you know that I use the least infrastructure required to run these experiments and I try to optimize the costs as much as possible. I was running my app in Fly.io with a 1GB machine. A 1GB machine isn't much these days, but I thought this could work somehow.

Once I opened the logs on PostHog, I saw hundreds of database connection errors. Specifically, timeouts!

That threw me off a little bit because the database operations were not that complex. However, I was using a transaction for the whole job. That means I was holding a connection for ~40 seconds and many jobs were fighting for the same limited resource. AKA: connection starvation.

In other words: a transaction checks out a database connection from the pool, and it holds it until the transaction completes. If the job spends most of its time doing slow I/O, that connection sits idle but unavailable to other parts of the system.

An easy but smart solution

After researching a bit, I found out that I should separate the connections into two database pools: one for the web app, and one for the asynchronous jobs. That way, the app would continue working no matter what was happening in the background.

In Elixir, creating two pools is super easy:

# Configure Ecto to use UUID7 for binary_id autogeneration
config :momento,
  ecto_repos: [Momento.Repo, Momento.ObanRepo],
  generators: [timestamp_type: :utc_datetime, binary_id: true]
 
config :momento, Momento.Repo,
  migration_primary_key: [type: :binary_id],
  migration_foreign_key: [type: :binary_id]
 
# ObanRepo uses same schema settings (migrations run through Momento.Repo)
config :momento, Momento.ObanRepo,
  migration_primary_key: [type: :binary_id],
  migration_foreign_key: [type: :binary_id]
 
# Configure Oban for background jobs
# IMPORTANT: Uses ObanRepo (separate connection pool) to isolate worker traffic from web traffic
config :momento, Oban,
  repo: Momento.ObanRepo,
  plugins: [...],
  queues: [...]

This snippet assumes you start Momento.ObanRepo in your supervision tree, just like your main Momento.Repo. (Adding it to ecto_repos helps with tasks like mix ecto.create.)

The module definition for Oban's repo was trivial

defmodule Momento.ObanRepo do
  use Ecto.Repo,
    otp_app: :momento,
    adapter: Ecto.Adapters.Postgres
 
  def after_connect(conn) do
    # Simple ping query to test connection health
    Postgrex.query(conn, "SELECT 1", [])
  end
end

With this quick fix, I had two separate database pools. The web traffic would never fight for connections with the asynchronous jobs. But that was not enough!

Database timeouts

After separating the database pools I was in a better position, however, I was still facing database connection errors. Let's take a look again at what happened inside a job:

  1. Download the image
  2. Generate a thumbnail
  3. Call the OpenAI Vision API to turn the image into structured JSON
  4. Embed that JSON
  5. Save the embeddings into a Postgres database

Steps 1 and 3 primarily took a lot of time. The whole job execution takes around 30 to 45 seconds. That means one single job will hold the connection for such a long time. No Bueno at all!

The fix to this problem was to architect the job differently. This single job had too many responsibilities, and the best practice is to reserve a database connection for a short time. So what originally was a single job (PhotoImportWorker) became three jobs:

  1. PhotoDownloadWorker
  2. PhotoAnalysisWorker
  3. PhotoStorageWorker

PhotoDownloadWorker

This job's responsibility was to download the image and generate a thumbnail. After that point, I only needed the thumbnail, so I saved it to S3 and passed the image to the next job.

PhotoAnalysisWorker

This job's responsibility was to run the Vision and Embedding. Then, it passes the embedding to the next job.

PhotoStorageWorker

Finally, this job updated the image's metadata, including the embedding in the Postgres database so we could perform the queries later on. A tiny but super important step was to broadcast the photo import so that LiveView could become aware of that and update the gallery.

Phoenix.PubSub.broadcast(
  Momento.PubSub,
  "photo_imports:#{email}",
  {:photo_imported, image_metadata}
)

You might be wondering: Why not use Oban Workflows? I am using Oban's free tier and workflows are part of Oban Pro. Again, I wanted a cost-effective solution and this worked perfectly for what I needed.

Chaining jobs safely (free-tier workflow)

The core idea is simple: each worker does one thing, persists the smallest artifact needed to continue, and then enqueues the next job.

Here is a simplified example of how a download job can enqueue the analysis job:

defmodule Momento.PhotoDownloadWorker do
  use Oban.Worker, queue: :photo_download, max_attempts: 3
 
  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"photo_id" => photo_id} = args}) do
    # 1) Slow I/O: download + generate thumbnail (no DB transaction)
    %{thumbnail_bytes: thumb} = Momento.Photos.fetch_and_thumbnail(args)
 
    # 2) Fast I/O: store thumbnail under a deterministic key (idempotent)
    thumb_key = "thumbnails/#{photo_id}.jpg"
    :ok = Momento.S3.put_object(thumb_key, thumb)
 
    # 3) Hand off to the next job
    %{photo_id: photo_id, thumb_key: thumb_key}
    |> Momento.PhotoAnalysisWorker.new(unique: [fields: [:args], period: 300])
    |> Oban.insert()
 
    :ok
  end
end

This pattern makes retries safe because the work is idempotent: the thumbnail key is deterministic (so a retry overwrites), and the next job is unique for a short window so you don't enqueue duplicates.

The cherry on top of the cake

The best part of separating the workflow into different jobs was that I could also scale each job independently.

So, my configuration looked like this in the end

config :momento, Oban,
  repo: Momento.ObanRepo,
  plugins: [
    # Keep jobs for 1 day
    {Oban.Plugins.Pruner, max_age: 86_400},
    {Oban.Plugins.Cron, crontab: []}
  ],
  queues: [
    # Photo import pipeline with 3 separate queues (workflow pattern)
    # Each queue optimized for its specific operation type
 
    # Step 1: Download & Thumbnail (fast, 1-2s per job)
    # High concurrency - these are fast and release connections quickly
    photo_download: 10,
 
    # Step 2: AI Analysis (slow, ~20-40s per job, and no DB needed during processing)
    # Medium concurrency - AI is the bottleneck, but workers don't hold DB connections
    photo_analysis: 5,
 
    # Step 3: Storage & DB Save (fast, 500ms per job)
    # High concurrency - these are fast final steps
    photo_storage: 10,
  ]

These numbers are just examples of how you can configure the concurrency for each queue independently, given that the stress each step applies to the whole system is different. For example, the photo_analysis queue should be treated carefully, since one can hit OpenAI limit quotas. That's one of the downsides or important things to look at when working with external services.

Finally, I configured how many jobs an account could run simultaneously. This ensures that the distribution of load is even across the system. Otherwise, an account that has more usage can take over the entire system and we want a good user experience for everyone.

Fault-Tolerant systems

We mentioned in the post title: Processing multimedia with AI in a resilient and fault-tolerant way. We have not mentioned it so far, but everything we mentioned is exactly how you make a system fault tolerant. Oban allows one to configure how many times each job should run in case of failures.

defmodule Momento.PhotoStorageWorker do
  use Oban.Worker, queue: :photo_storage, max_attempts: 3
end

When a job fails, it will be re-run after some time. Usually, we use exponential backoff to avoid stressing the system too much. Oban makes this easy: by configuring max_attempts, we ensure that transient failures can be retried safely.

It is also important to design the code that runs in each job to be idempotent. This means that no matter how many times we execute a job, it should produce the same result (e.g. without creating duplicate entries in your database).

Conclusion (Key takeaways)

To build a resilient multimedia pipeline, you want to protect scarce resources (like DB connections), keep each background job small and retryable, and scale each step independently. Oban gives you the primitives to do that well—queues, retries, and backoff—while you keep each step idempotent.

In practice, it comes down to:

  • Split DB pools to protect web traffic from worker traffic.
  • Don’t hold DB transactions open while doing slow I/O (downloads, external APIs).
  • Split long pipelines into multiple jobs so each step holds a DB connection for the shortest possible time.
  • Scale concurrency per queue; treat external API queues carefully to respect rate limits.
  • Make jobs idempotent so retries are safe (deterministic storage keys, upserts, and unique jobs where it makes sense).

Let me know what you think and how you are using Oban in production.