04 – Context Boundaries
Overview
Sound Forge Alchemy organizes its domain logic into five context boundaries following the Phoenix convention of bounded contexts. Each context owns its schemas, business logic, and external interface. Cross-context communication happens through explicit function calls and PubSub events – never through shared database queries or implicit coupling.
flowchart TD
subgraph Music["Music Context"]
Track
DownloadJob
ProcessingJob
AnalysisJob
Stem
AnalysisResult
end
subgraph Spotify["Spotify Context"]
URLParser
ClientBehaviour["Client (behav.)"]
HTTPClient
end
subgraph Jobs["Jobs Context"]
Download
Processing
Analysis
DownloadWorker
end
subgraph Audio["Audio Context"]
AnalyzerPort
DemucsPort
end
subgraph Storage["Storage Context"]
downloads["downloads/"]
stems["stems/"]
analysis["analysis/"]
end
Music <-->|metadata| Spotify
Spotify -->|enqueue| Jobs
Jobs -->|persists via| Music
Music -->|analyze/separate| Audio
Audio -->|files| Storage
Context Definitions
Music Context (SoundForge.Music)
The Music context is the core data domain. It owns all Ecto schemas and provides CRUD operations for every persistent entity in the system. Other contexts depend on Music for data access but never reach into its schemas directly.
Module: lib/sound_forge/music.ex
Schemas owned:
SoundForge.Music.Track– core entity, represents a music trackSoundForge.Music.DownloadJob– tracks download progress and stateSoundForge.Music.ProcessingJob– tracks stem separation progressSoundForge.Music.AnalysisJob– tracks audio analysis progressSoundForge.Music.Stem– individual stem file metadata (vocals, drums, bass, other)SoundForge.Music.AnalysisResult– extracted audio features (tempo, key, energy, spectral)
Public API:
# Track operations
Music.list_tracks() # => [%Track{}, ...]
Music.search_tracks("beatles") # => [%Track{artist: "The Beatles", ...}]
Music.get_track!(id) # => %Track{} | raises
Music.create_track(%{title: "Song"}) # => {:ok, %Track{}} | {:error, changeset}
Music.update_track(track, %{artist: "..."}) # => {:ok, %Track{}} | {:error, changeset}
Music.delete_track(track) # => {:ok, %Track{}} | {:error, changeset}
# Job operations (download, processing, analysis)
Music.get_download_job!(id)
Music.create_download_job(%{track_id: id, status: :queued})
Music.update_download_job(job, %{status: :completed, progress: 100})
Music.get_processing_job!(id)
Music.create_processing_job(%{track_id: id, model: "htdemucs"})
Music.update_processing_job(job, attrs)
Music.get_analysis_job!(id)
Music.create_analysis_job(%{track_id: id, status: :queued})
Music.update_analysis_job(job, attrs)
# Stem and result operations
Music.list_stems_for_track(track_id)
Music.create_stem(%{track_id: id, processing_job_id: jid, stem_type: :vocals})
Music.get_analysis_result_for_track(track_id)
Music.create_analysis_result(%{track_id: id, analysis_job_id: jid, tempo: 120.0})
Boundary rules:
- Only Music context modules directly
import Ecto.Queryfor these schemas - Controllers and LiveViews call Music context functions, never
Repo.getdirectly - All changeset validation lives inside the schema modules
Spotify Context (SoundForge.Spotify)
The Spotify context encapsulates all interaction with the Spotify Web API. It owns URL parsing, OAuth token management, and API request logic. The rest of the application only sees fetch_metadata/1 – the implementation details of authentication and HTTP transport are completely hidden.
Modules:
lib/sound_forge/spotify.ex– public API, routing by resource typelib/sound_forge/spotify/url_parser.ex– regex-based URL parsinglib/sound_forge/spotify/client.ex– behaviour definitionlib/sound_forge/spotify/http_client.ex– Req-based implementation with ETS token cache
Public API:
SoundForge.Spotify.fetch_metadata("https://open.spotify.com/track/abc123")
# => {:ok, %{"id" => "abc123", "name" => "Song Name", "artists" => [...]}}
SoundForge.Spotify.fetch_metadata("invalid-url")
# => {:error, :invalid_spotify_url}
Internal architecture:
# The context module dispatches to the configured client implementation
defmodule SoundForge.Spotify do
def fetch_metadata(url) do
with {:ok, %{type: type, id: id}} <- URLParser.parse(url) do
client = spotify_client() # configurable for testing
case type do
"track" -> client.fetch_track(id)
"album" -> client.fetch_album(id)
"playlist" -> client.fetch_playlist(id)
end
end
end
defp spotify_client do
Application.get_env(:sound_forge, :spotify_client, SoundForge.Spotify.HTTPClient)
end
end
Behaviour for testability:
defmodule SoundForge.Spotify.Client do
@callback fetch_track(String.t()) :: {:ok, map()} | {:error, term()}
@callback fetch_album(String.t()) :: {:ok, map()} | {:error, term()}
@callback fetch_playlist(String.t()) :: {:ok, map()} | {:error, term()}
end
This allows test configuration to swap in a mock:
# config/test.exs
config :sound_forge, :spotify_client, SoundForge.Spotify.MockClient
Boundary rules:
- No other context touches the Spotify API directly
- Token management is entirely internal to HTTPClient
- URL parsing is the only stateless utility – everything else requires credentials
Jobs Context (SoundForge.Jobs)
The Jobs context orchestrates background work. It provides higher-level coordination on top of the Music context’s CRUD operations and Oban’s worker infrastructure. Each sub-module (Download, Processing, Analysis) handles the lifecycle of creating a job record, enqueuing the Oban worker, and providing status lookups.
Modules:
lib/sound_forge/jobs/download.ex– download job orchestrationlib/sound_forge/jobs/download_worker.ex– Oban worker for spotdl executionlib/sound_forge/jobs/processing.ex– stem separation job orchestrationlib/sound_forge/jobs/analysis.ex– audio analysis job orchestration
Orchestration pattern (Download as canonical example):
defmodule SoundForge.Jobs.Download do
alias SoundForge.Music
alias SoundForge.Repo
def create_job(url) when is_binary(url) do
with {:ok, track} <- find_or_create_track(url),
{:ok, job} <- Music.create_download_job(%{track_id: track.id, status: :queued}) do
enqueue_worker(job, track, url)
{:ok, job}
end
end
def get_job(id) do
case Ecto.UUID.cast(id) do
{:ok, _uuid} ->
case Repo.get(Music.DownloadJob, id) do
nil -> {:error, :not_found}
job -> {:ok, job}
end
:error ->
{:error, :not_found}
end
end
defp find_or_create_track(url) do
case Repo.get_by(Music.Track, spotify_url: url) do
nil -> Music.create_track(%{title: "Pending download", spotify_url: url})
track -> {:ok, track}
end
end
defp enqueue_worker(job, track, url) do
%{track_id: track.id, spotify_url: url, quality: "320k", job_id: job.id}
|> SoundForge.Jobs.DownloadWorker.new()
|> Oban.insert()
end
end
Boundary rules:
- Jobs context depends on Music context for all persistence
- Workers broadcast progress via PubSub – they never push to WebSocket channels directly
- Controllers call Jobs context functions (e.g.,
Jobs.Download.create_job/1), not workers
Audio Context (SoundForge.Audio)
The Audio context wraps Python interop through Erlang Ports. It provides two GenServer processes: AnalyzerPort for librosa-based feature extraction and DemucsPort for neural stem separation. These are infrastructure services, not domain entities – they manage the OS-level process lifecycle, JSON protocol, and timeout handling.
Modules:
lib/sound_forge/audio/analyzer_port.ex– GenServer wrappingpriv/python/analyzer.pylib/sound_forge/audio/demucs_port.ex– GenServer wrappingpriv/python/demucs_runner.py
Public API:
# Analysis (2-minute timeout)
{:ok, results} = SoundForge.Audio.AnalyzerPort.analyze("/path/to/audio.mp3")
{:ok, results} = SoundForge.Audio.AnalyzerPort.analyze("/path/to/audio.mp3", ["tempo", "key"])
{:ok, results} = SoundForge.Audio.AnalyzerPort.analyze("/path/to/audio.mp3", ["all"])
# Stem separation (5-minute timeout)
{:ok, result} = SoundForge.Audio.DemucsPort.separate("/path/to/audio.mp3")
{:ok, result} = SoundForge.Audio.DemucsPort.separate("/path/to/audio.mp3",
model: "htdemucs_ft",
output_dir: "/custom/output",
progress_callback: fn pct, msg -> IO.puts("#{pct}%: #{msg}") end
)
Boundary rules:
- Only Oban workers (or tests) call AnalyzerPort/DemucsPort directly
- These GenServers are NOT added to the supervision tree by default – they are started on demand or supervised by workers
- The JSON protocol is an internal detail; callers receive Elixir maps
Storage Context (SoundForge.Storage)
The Storage context manages the local filesystem layout for all persisted audio files. It provides a consistent interface for storing, retrieving, and cleaning up files across subdirectories.
Module: lib/sound_forge/storage.ex
Directory layout:
flowchart TD
root["priv/uploads/"]
root --> dl["downloads/\n(Raw audio files from spotdl)"]
root --> st["stems/\n(Separated stem files from Demucs)"]
root --> an["analysis/\n(Analysis output artifacts)"]
Public API:
SoundForge.Storage.base_path() # => "priv/uploads"
SoundForge.Storage.downloads_path() # => "priv/uploads/downloads"
SoundForge.Storage.stems_path() # => "priv/uploads/stems"
SoundForge.Storage.analysis_path() # => "priv/uploads/analysis"
SoundForge.Storage.ensure_directories!() # Creates all dirs
SoundForge.Storage.store_file(src, "downloads", "a.mp3") # => {:ok, dest_path}
SoundForge.Storage.file_path("downloads", "a.mp3") # => "priv/uploads/downloads/a.mp3"
SoundForge.Storage.file_exists?("downloads", "a.mp3") # => true | false
SoundForge.Storage.delete_file("downloads", "a.mp3") # => :ok | {:error, reason}
SoundForge.Storage.stats() # => %{file_count: 42, total_size_mb: 1.23}
Boundary rules:
- Workers and contexts call Storage for path resolution – they never hardcode paths
- The base path is configurable via
Application.get_env(:sound_forge, :storage_path) - Cleanup operations will integrate with Music context to find orphaned files
Service Orchestration: Spotify URL Flow
When a user submits a Spotify URL, the request flows through multiple context boundaries in a well-defined sequence:
flowchart TD
User(["User submits URL"])
Controller["SoundForgeWeb.API.DownloadController\nPOST /api/download/track"]
CreateJob["SoundForge.Jobs.Download.create_job/1"]
FindTrack["find_or_create_track(url)"]
CreateJobRecord["Music.create_download_job(attrs)"]
Enqueue["enqueue_worker(job, track, url)"]
Oban["Oban picks up job from :download queue"]
Worker["SoundForge.Jobs.DownloadWorker.perform/1"]
GetJob["Music.get_download_job!(job_id)"]
UpdateJob1["Music.update_download_job(job, ...)"]
Broadcast1["broadcast_progress(job_id, ...)"]
Spotdl["execute_download(spotify_url, ...)\nspotdl CLI + Storage"]
UpdateJob2["Music.update_download_job(job, ...)\nfinal state"]
Broadcast2["broadcast_progress(job_id, ...)"]
PubSub["Phoenix.PubSub\njobs:{job_id} topic"]
JobChannel["SoundForgeWeb.JobChannel\npushes to WebSocket client"]
DashboardLive["SoundForgeWeb.DashboardLive\nupdates LiveView assigns"]
User --> Controller
Controller --> CreateJob
CreateJob --> FindTrack
CreateJob --> CreateJobRecord
CreateJob --> Enqueue
Enqueue --> Oban
Oban --> Worker
Worker --> GetJob
Worker --> UpdateJob1
Worker --> Broadcast1
Worker --> Spotdl
Worker --> UpdateJob2
Worker --> Broadcast2
Broadcast1 --> PubSub
Broadcast2 --> PubSub
PubSub --> JobChannel
PubSub --> DashboardLive
style FindTrack fill:#e8f4e8,stroke:#4a7c4a
style CreateJobRecord fill:#e8f4e8,stroke:#4a7c4a
style GetJob fill:#e8f4e8,stroke:#4a7c4a
style UpdateJob1 fill:#e8f4e8,stroke:#4a7c4a
style UpdateJob2 fill:#e8f4e8,stroke:#4a7c4a
style Broadcast1 fill:#e8eef4,stroke:#4a6a7c
style Broadcast2 fill:#e8eef4,stroke:#4a6a7c
style CreateJob fill:#f4ede8,stroke:#7c5a4a
style Worker fill:#f4ede8,stroke:#7c5a4a
The key observation is that no context reaches into another’s internals. The DownloadWorker calls Music.update_download_job/2 rather than building its own changeset. It broadcasts to PubSub rather than pushing directly to a channel. The controller calls Jobs.Download.create_job/1 rather than inserting records and enqueuing workers itself.
Transaction Boundaries
Simple Transactions
Most operations use implicit single-row transactions through Repo.insert/1 and Repo.update/1. These are sufficient when creating or updating a single record.
Multi-step Transactions with Ecto.Multi
When a workflow must atomically modify multiple records, Ecto.Multi provides explicit transaction boundaries. This is the planned pattern for the download completion flow:
defmodule SoundForge.Jobs.DownloadWorker do
# Planned: atomic completion with track metadata update
defp complete_download(job, track, output_path, file_size, metadata) do
Ecto.Multi.new()
|> Ecto.Multi.update(:job, DownloadJob.changeset(job, %{
status: :completed,
progress: 100,
output_path: output_path,
file_size: file_size
}))
|> Ecto.Multi.update(:track, Track.changeset(track, %{
title: metadata["name"],
artist: metadata["artists"] |> List.first() |> Map.get("name"),
album: metadata["album"]["name"],
duration: metadata["duration_ms"]
}))
|> Repo.transaction()
|> case do
{:ok, %{job: job, track: _track}} ->
broadcast_progress(job.id, :completed, 100)
:ok
{:error, failed_operation, changeset, _changes} ->
Logger.error("Transaction failed at #{failed_operation}: #{inspect(changeset.errors)}")
{:error, "Transaction failed at #{failed_operation}"}
end
end
end
Current Transaction Pattern
The existing Jobs.Download.create_job/1 uses with chains with individual database calls. This means partial failure is possible – a track could be created without its corresponding job. The planned migration to Ecto.Multi addresses this:
# Planned: atomic job creation
def create_job(url) when is_binary(url) do
Ecto.Multi.new()
|> Ecto.Multi.run(:track, fn _repo, _changes ->
case Repo.get_by(Music.Track, spotify_url: url) do
nil -> Music.create_track(%{title: "Pending download", spotify_url: url})
track -> {:ok, track}
end
end)
|> Ecto.Multi.run(:job, fn _repo, %{track: track} ->
Music.create_download_job(%{track_id: track.id, status: :queued})
end)
|> Ecto.Multi.run(:oban, fn _repo, %{job: job, track: track} ->
%{track_id: track.id, spotify_url: url, quality: "320k", job_id: job.id}
|> SoundForge.Jobs.DownloadWorker.new()
|> Oban.insert()
end)
|> Repo.transaction()
|> case do
{:ok, %{job: job}} -> {:ok, job}
{:error, _step, reason, _changes} -> {:error, reason}
end
end
Error Handling Patterns
Tagged Tuples
Every public context function returns {:ok, result} or {:error, reason}. This is enforced by convention and typespecs:
@spec create_job(String.t()) :: {:ok, map()} | {:error, term()}
@spec get_job(String.t()) :: {:ok, struct()} | {:error, :not_found}
@spec fetch_metadata(String.t()) :: {:ok, map()} | {:error, term()}
Controller-Level Error Handling
Controllers use Code.ensure_loaded?/1 with try/rescue as a defensive pattern for modules that may not yet be compiled or available:
# From SoundForgeWeb.API.DownloadController
defp start_download_job(url) do
if Code.ensure_loaded?(SoundForge.Jobs.Download) do
SoundForge.Jobs.Download.create_job(url)
else
{:ok, %{id: generate_job_id(), status: "pending", url: url}}
end
rescue
UndefinedFunctionError ->
{:ok, %{id: generate_job_id(), status: "pending", url: url}}
end
This pattern allows the web layer to operate with stub responses during incremental development, falling back gracefully when a context module is not yet implemented.
LiveView Error Handling
LiveView modules use try/rescue to protect against database or context errors, ensuring the UI remains responsive even if a backing service is unavailable:
# From SoundForgeWeb.DashboardLive
defp list_tracks do
try do
SoundForge.Music.list_tracks()
rescue
_ -> []
end
end
Oban Worker Retry
Workers return {:error, reason} to trigger Oban’s built-in retry mechanism:
use Oban.Worker,
queue: :download,
max_attempts: 3,
priority: 1
def perform(%Oban.Job{args: args}) do
case execute_download(args) do
{:ok, _result} -> :ok # Oban marks job as completed
{:error, reason} ->
broadcast_progress(job_id, :failed, 0)
{:error, reason} # Oban retries up to max_attempts
end
end
Port Error Handling
Erlang Port GenServers handle errors through exit status codes and JSON error parsing:
# Non-zero exit status from Python process
def handle_info({port, {:exit_status, code}}, %{port: port, caller: caller, buffer: buffer} = state) do
error = parse_error(buffer, code)
GenServer.reply(caller, {:error, error})
{:noreply, reset_state(state)}
end
defp parse_error(buffer, exit_code) do
case Jason.decode(String.trim(buffer)) do
{:ok, %{"error" => error_type, "message" => message}} ->
{:error_from_script, error_type, message}
{:ok, %{"error" => error}} ->
{:error_from_script, error}
_ ->
{:exit_code, exit_code, String.trim(buffer)}
end
end
PubSub Event Patterns
Phoenix.PubSub is the sole mechanism for cross-context real-time communication. No context directly calls into another context’s LiveView or Channel.
Topics
| Topic Pattern | Publisher | Subscribers | Events |
|---|---|---|---|
"tracks" |
Music context | DashboardLive | {:track_added, track} |
"jobs:{job_id}" |
DownloadWorker | JobChannel, DashboardLive | {:job_progress, payload}, {:job_completed, payload}, {:job_failed, payload} |
Publishing Events
Workers broadcast progress updates using a consistent pattern:
# From SoundForge.Jobs.DownloadWorker
defp broadcast_progress(job_id, status, progress) do
Phoenix.PubSub.broadcast(
SoundForge.PubSub,
"jobs:#{job_id}",
{:job_progress, %{job_id: job_id, status: status, progress: progress}}
)
end
Subscribing to Events
WebSocket Channel (for JavaScript clients):
defmodule SoundForgeWeb.JobChannel do
use SoundForgeWeb, :channel
def join("jobs:" <> job_id, _payload, socket) do
Phoenix.PubSub.subscribe(SoundForge.PubSub, "jobs:#{job_id}")
{:ok, assign(socket, :job_id, job_id)}
end
def handle_info({:job_progress, payload}, socket) do
push(socket, "job:progress", payload)
{:noreply, socket}
end
end
LiveView (for server-rendered UI):
defmodule SoundForgeWeb.DashboardLive do
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(SoundForge.PubSub, "tracks")
end
{:ok, socket}
end
def handle_info({:track_added, track}, socket) do
{:noreply, stream_insert(socket, :tracks, track, at: 0)}
end
def handle_info({:job_progress, payload}, socket) do
jobs = Map.put(socket.assigns.active_jobs, payload.job_id, payload)
{:noreply, assign(socket, :active_jobs, jobs)}
end
end
Event Flow Diagram
flowchart LR
Worker["DownloadWorker.perform/1"]
Broadcast["Phoenix.PubSub.broadcast\njobs:abc123\n{:job_progress, %{...}}"]
PubSub["PubSub\njobs:abc123 topic"]
JobChannel["JobChannel.handle_info/2"]
DashboardLive["DashboardLive.handle_info/2"]
JSClient["JS client"]
Rerender["re-render"]
Worker --> Broadcast
Broadcast --> PubSub
PubSub --> JobChannel
PubSub --> DashboardLive
JobChannel -->|"push(job:progress, payload)"| JSClient
DashboardLive -->|"assign(:active_jobs, ...)"| Rerender
Planned Events
| Topic | Event | Purpose |
|---|---|---|
"tracks" |
{:track_updated, track} |
Metadata updates after Spotify fetch |
"tracks" |
{:track_deleted, track_id} |
Track removal propagation |
"jobs:{id}" |
{:job_started, payload} |
Worker pickup notification |
"processing:{id}" |
{:stem_ready, stem} |
Individual stem completion |
"analysis:{id}" |
{:feature_extracted, feature} |
Incremental analysis results |