07 – Integration Patterns
Overview
Sound Forge Alchemy integrates with four external systems: the Spotify Web API (metadata), Python processes via Erlang Ports (audio analysis and stem separation), the spotdl CLI tool (downloading), and the local filesystem (storage). Each integration follows a distinct pattern optimized for its characteristics – HTTP request/response for Spotify, long-running process communication for Python, CLI execution for downloads, and filesystem operations for storage.
Spotify API Integration
Architecture
flowchart TD
CTX["SoundForge.Spotify (context)"]
CTX --> UP["URLParser\n(stateless regex parsing)"]
CTX --> CB["Client behaviour\n(defines the interface)"]
CTX --> HC["HTTPClient\n(Req-based implementation)"]
HC --> ETS["ETS token cache\n(:spotify_tokens)"]
HC --> OA["Spotify OAuth2 token endpoint"]
HC --> API["Spotify Web API v1"]
OAuth2 Client Credentials Flow
The Spotify Web API uses the client credentials grant type, suitable for server-to-server access that does not require user authorization. Credentials are configured via environment variables:
# config/runtime.exs
config :sound_forge, :spotify,
client_id: System.get_env("SPOTIFY_CLIENT_ID"),
client_secret: System.get_env("SPOTIFY_CLIENT_SECRET")
Token acquisition:
defp request_token(config) do
auth = Base.encode64("#{config.client_id}:#{config.client_secret}")
headers = [
{"Authorization", "Basic #{auth}"},
{"Content-Type", "application/x-www-form-urlencoded"}
]
case Req.post("https://accounts.spotify.com/api/token",
form: %{grant_type: "client_credentials"},
headers: headers
) do
{:ok, %Req.Response{status: 200, body: %{"access_token" => token}}} ->
{:ok, token}
{:ok, %Req.Response{status: status, body: body}} ->
{:error, {:token_error, status, body}}
{:error, reason} ->
{:error, reason}
end
end
ETS Token Cache
Tokens are cached in a named ETS table with a 3500-second TTL (Spotify tokens expire after 3600 seconds, giving a 100-second safety margin):
@token_ttl 3500
@token_table :spotify_tokens
def init do
:ets.new(@token_table, [:named_table, :public, :set])
rescue
ArgumentError -> :already_exists
end
defp get_cached_token do
init() # idempotent
case :ets.lookup(@token_table, :access_token) do
[{:access_token, token, expires_at}] ->
if System.system_time(:second) < expires_at do
{:ok, token}
else
:error
end
[] ->
:error
end
end
defp cache_token(token) do
expires_at = System.system_time(:second) + @token_ttl
:ets.insert(@token_table, {:access_token, token, expires_at})
end
Why ETS instead of GenServer state?
- The ETS table survives GenServer restarts (it is owned by the Application process)
- Multiple processes (concurrent Oban workers, LiveView processes) can read the token without serializing through a single GenServer
- Token refresh is lazy – the first request after expiry triggers a new token fetch
Req HTTP Client
API requests use Req.get/2 with bearer token authentication:
defp make_api_request(resource_type, id, token) do
url = "https://api.spotify.com/v1/#{resource_type}/#{id}"
case Req.get(url, headers: [{"Authorization", "Bearer #{token}"}]) do
{:ok, %Req.Response{status: 200} = response} ->
{:ok, response}
{:ok, response} ->
{:error, response}
{:error, reason} ->
{:error, reason}
end
end
The full token-fetch-and-retry flow chains through with:
defp fetch_resource(resource_type, id) do
with {:ok, token} <- get_access_token(),
{:ok, response} <- make_api_request(resource_type, id, token) do
{:ok, response.body}
else
{:error, %Req.Response{status: status, body: body}} ->
Logger.error("Spotify API error: #{status} - #{inspect(body)}")
{:error, {:api_error, status, body}}
{:error, reason} = error ->
Logger.error("Spotify request failed: #{inspect(reason)}")
error
end
end
URL Parsing
The URLParser module handles the various URL formats Spotify uses:
@spotify_regex ~r{(?:https?://)?(?:open\.)?spotify\.com/(?:intl-\w+/)?(track|album|playlist)/([a-zA-Z0-9]+)}
def parse(url) when is_binary(url) do
case Regex.run(@spotify_regex, url) do
[_, type, id] -> {:ok, %{type: type, id: id}}
_ -> {:error, :invalid_spotify_url}
end
end
Supported URL formats:
https://open.spotify.com/track/6rqhFgbbKwnb9MLmUQDhG6https://open.spotify.com/intl-en/track/6rqhFgbbKwnb9MLmUQDhG6spotify.com/album/xyz789https://open.spotify.com/playlist/37i9dQZF1DXcBWIGoYBM5M
Testability via Behaviour
The Client behaviour allows swapping the HTTP implementation in tests:
# lib/sound_forge/spotify/client.ex
defmodule SoundForge.Spotify.Client do
@callback fetch_track(String.t()) :: {:ok, map()} | {:error, term()}
@callback fetch_album(String.t()) :: {:ok, map()} | {:error, term()}
@callback fetch_playlist(String.t()) :: {:ok, map()} | {:error, term()}
end
# test/support/mocks.ex
Mox.defmock(SoundForge.Spotify.MockClient, for: SoundForge.Spotify.Client)
# config/test.exs
config :sound_forge, :spotify_client, SoundForge.Spotify.MockClient
# In tests
expect(SoundForge.Spotify.MockClient, :fetch_track, fn "abc123" ->
{:ok, %{"id" => "abc123", "name" => "Test Song", "artists" => [%{"name" => "Test Artist"}]}}
end)
Python Integration via Erlang Ports
Architecture
flowchart TD
subgraph BEAM["Elixir (BEAM VM)"]
AP["AnalyzerPort\n(GenServer)"]
DP["DemucsPort\n(GenServer)"]
end
subgraph OS["OS Processes"]
PY1["Python 3\nanalyzer.py\n(librosa)"]
PY2["Python 3\ndemucs_runner.py\n(demucs/torch)"]
end
AP <-->|"stdin/stdout\nexit_status"| PY1
DP <-->|"stdin/stdout\nexit_status"| PY2
Port Opening
Both ports use Port.open/2 with :spawn_executable for maximum control:
Port.open({:spawn_executable, python}, [
:binary, # receive data as binaries, not charlists
:exit_status, # receive {port, {:exit_status, code}} on process exit
args: args # command-line arguments to the Python script
])
Why :spawn_executable over :spawn?
:spawn_executablebypasses the shell, preventing injection attacks- Arguments are passed as a list, not interpolated into a string
- The BEAM manages the OS process lifecycle directly
JSON Protocol: AnalyzerPort (Request-Response)
The AnalyzerPort uses a single-shot protocol. Python runs, produces one JSON object on stdout, and exits:
sequenceDiagram
participant E as Elixir
participant P as Python
E->>P: Port.open(python, args)
Note over P: Load audio<br/>Extract features<br/>Print JSON result
P-->>E: {port, {:data, json_chunk_1}}
P-->>E: {port, {:data, json_chunk_2}}
P-->>E: {port, {:exit_status, 0}}
Note over E: buffer = chunk_1 <> chunk_2<br/>Jason.decode(buffer)
Data arrives in chunks (the OS may split stdout into multiple reads). The GenServer accumulates chunks in a buffer and parses the complete JSON only after receiving exit_status: 0:
def handle_info({port, {:data, data}}, %{port: port, buffer: buffer} = state) do
{:noreply, %{state | buffer: buffer <> data}}
end
def handle_info({port, {:exit_status, 0}}, %{port: port, caller: caller, buffer: buffer} = state) do
result = parse_output(buffer)
GenServer.reply(caller, result)
{:noreply, reset_state(state)}
end
JSON Lines Protocol: DemucsPort (Streaming)
The DemucsPort uses a JSON-lines protocol where each line is an independent JSON object. This enables real-time progress reporting during long-running stem separation:
{"type": "progress", "percent": 10, "message": "Loading model..."}
{"type": "progress", "percent": 30, "message": "Separating sources..."}
{"type": "progress", "percent": 70, "message": "Writing stems..."}
{"type": "result", "stems": {"vocals": "/path", "drums": "/path", ...}, "model": "htdemucs", "output_dir": "/tmp/demucs"}
The GenServer splits incoming data on newline boundaries, processing complete lines immediately while buffering incomplete trailing data:
def handle_info({port, {:data, data}}, %{port: port, buffer: buffer} = state) do
new_buffer = buffer <> data
{lines, remaining} = extract_lines(new_buffer)
Enum.each(lines, fn line ->
process_json_line(line, state)
end)
{:noreply, %{state | buffer: remaining}}
end
defp extract_lines(buffer) do
lines = String.split(buffer, "\n")
case List.pop_at(lines, -1) do
{incomplete, complete_lines} -> {complete_lines, incomplete || ""}
nil -> {[], buffer}
end
end
Timeout Handling
Both ports set explicit timeouts on the GenServer.call/3:
# AnalyzerPort
@timeout 120_000 # 2 minutes
def analyze(audio_path, features) do
GenServer.call(__MODULE__, {:analyze, audio_path, features}, @timeout)
end
# DemucsPort
@timeout 300_000 # 5 minutes
def separate(audio_path, opts) do
GenServer.call(__MODULE__, {:separate, audio_path, model, output_dir, callback}, @timeout)
end
If the Python process exceeds the timeout:
- The calling process receives
** (exit) exited in: GenServer.call(..., 120000)with reason:timeout - The GenServer continues running with the port still open
- Eventually the port exits and the GenServer resets its state
Planned improvement: Add a monitor or timer that kills the OS process if it exceeds the timeout:
# Planned: force-kill hung Python processes
def handle_call({:analyze, path, features}, from, state) do
port = open_port(python, script, path, features)
timer = Process.send_after(self(), {:port_timeout, port}, @timeout)
{:noreply, %{state | port: port, caller: from, timer: timer}}
end
def handle_info({:port_timeout, port}, %{port: port, caller: caller} = state) do
Port.close(port)
GenServer.reply(caller, {:error, :timeout})
{:noreply, reset_state(state)}
end
Error Recovery
Python script errors are communicated through two channels:
- JSON error objects on stdout:
{"error": "FileNotFoundError", "message": "No such file"} - Non-zero exit status: Caught by
:exit_statushandler
The GenServer parses both:
defp parse_error(buffer, exit_code) do
trimmed = String.trim(buffer)
case Jason.decode(trimmed) do
{:ok, %{"error" => error_type, "message" => message}} ->
{:error_from_script, error_type, message}
{:ok, %{"error" => error}} ->
{:error_from_script, error}
_ ->
{:exit_code, exit_code, trimmed}
end
end
Python Script Location
Scripts are stored in priv/python/ and located at runtime via :code.priv_dir/1:
defp find_analyzer_script do
script_path = Path.join(:code.priv_dir(:sound_forge), "python/analyzer.py")
if File.exists?(script_path) do
{:ok, script_path}
else
{:error, {:script_not_found, script_path}}
end
end
This works correctly in both development (pointing to the project’s priv/ directory) and releases (pointing to the release’s bundled priv/ directory).
spotdl CLI Integration
Architecture
The DownloadWorker shells out to spotdl via System.cmd/3. Unlike the Erlang Port integration, this is a simple synchronous execution – the worker blocks until the download completes.
flowchart TD
DW["DownloadWorker.perform/1"]
SP["spotdl subprocess"]
RC["{output, exit_code}"]
DW -->|"System.cmd(\"spotdl\", args, stderr_to_stdout: true)"| SP
SP -->|"Downloads from YouTube (found via Spotify metadata match)\nConverts to MP3 at specified bitrate\nWrites to output_file"| RC
Command Construction
args = [
spotify_url, # "https://open.spotify.com/track/abc123"
"--output", output_file, # "priv/uploads/downloads/{track_id}.mp3"
"--format", "mp3",
"--bitrate", quality # "320k"
]
case System.cmd("spotdl", args, stderr_to_stdout: true) do
{_output, 0} ->
{:ok, %{path: output_file, size: File.stat!(output_file).size}}
{error_output, _code} ->
{:error, "Download failed: #{error_output}"}
end
Why System.cmd Instead of Erlang Ports?
- spotdl is a one-shot tool: it runs, downloads, and exits
- No streaming protocol needed – there is no intermediate output to parse
stderr_to_stdout: truecaptures all output for error reporting- The Oban worker already provides the concurrency control (3 workers max)
Planned: Progress Streaming
For future progress reporting, spotdl could be wrapped in a Port instead:
# Planned: streaming progress from spotdl
defp execute_download_with_progress(spotify_url, quality, track_id, job_id) do
port = Port.open({:spawn_executable, System.find_executable("spotdl")}, [
:binary,
:exit_status,
:stderr_to_stdout,
args: [spotify_url, "--output", output_file, "--format", "mp3", "--bitrate", quality]
])
receive_download_output(port, job_id, "")
end
defp receive_download_output(port, job_id, buffer) do
receive do
{^port, {:data, data}} ->
new_buffer = buffer <> data
maybe_broadcast_progress(new_buffer, job_id)
receive_download_output(port, job_id, new_buffer)
{^port, {:exit_status, 0}} ->
{:ok, buffer}
{^port, {:exit_status, code}} ->
{:error, "spotdl exited with code #{code}: #{buffer}"}
after
300_000 ->
Port.close(port)
{:error, :download_timeout}
end
end
File System Integration via Storage Module
Architecture
The Storage module provides a thin abstraction over the local filesystem, with configurable base path and three fixed subdirectories:
flowchart TD
ROOT["priv/uploads/\n(configurable via :storage_path)"]
DL["downloads/\nRaw audio files from spotdl"]
MP3["{track_id}.mp3\nNamed by UUID"]
ST["stems/\nSeparated stem files from Demucs"]
TD["{track_id}/\nOne directory per track"]
VOC["vocals.wav"]
DR["drums.wav"]
BA["bass.wav"]
OT["other.wav"]
AN["analysis/\nAnalysis output artifacts"]
JSON["{track_id}.json\nFeature extraction results"]
ROOT --> DL
DL --> MP3
ROOT --> ST
ST --> TD
TD --> VOC
TD --> DR
TD --> BA
TD --> OT
ROOT --> AN
AN --> JSON
Path Resolution
All paths are resolved through the Storage module, never hardcoded:
defmodule SoundForge.Storage do
@default_base_path "priv/uploads"
def base_path do
Application.get_env(:sound_forge, :storage_path, @default_base_path)
end
def downloads_path, do: Path.join(base_path(), "downloads")
def stems_path, do: Path.join(base_path(), "stems")
def analysis_path, do: Path.join(base_path(), "analysis")
end
File Operations
# Store a file (copy from source to managed storage)
def store_file(source_path, subdirectory, filename) do
dest_dir = Path.join(base_path(), subdirectory)
File.mkdir_p!(dest_dir)
dest_path = Path.join(dest_dir, filename)
case File.cp(source_path, dest_path) do
:ok -> {:ok, dest_path}
{:error, reason} -> {:error, reason}
end
end
# Delete with idempotent handling
def delete_file(subdirectory, filename) do
path = file_path(subdirectory, filename)
case File.rm(path) do
:ok -> :ok
{:error, :enoent} -> :ok # already gone -- not an error
{:error, reason} -> {:error, reason}
end
end
Storage Statistics
def stats do
base = base_path()
if File.dir?(base) do
{file_count, total_size} = count_files(base)
%{
base_path: base,
file_count: file_count,
total_size_bytes: total_size,
total_size_mb: Float.round(total_size / (1024 * 1024), 2)
}
else
%{base_path: base, file_count: 0, total_size_bytes: 0, total_size_mb: 0.0}
end
end
Planned: Serving Files
Files are served to the browser through the FileController, which maps URL paths to storage locations:
# From router.ex
get "/files/*path", FileController, :serve
Circuit Breaker Pattern (Planned)
External service calls (Spotify API, spotdl) can fail due to rate limiting, network issues, or service outages. A circuit breaker prevents cascading failures by short-circuiting requests when a threshold of failures is reached.
Planned Implementation
defmodule SoundForge.CircuitBreaker do
use GenServer
@failure_threshold 5
@recovery_timeout 30_000 # 30 seconds
defstruct state: :closed, failure_count: 0, last_failure: nil
def call(service, fun) do
case GenServer.call(__MODULE__, {:check, service}) do
:ok ->
case fun.() do
{:ok, _} = result ->
GenServer.cast(__MODULE__, {:success, service})
result
{:error, _} = error ->
GenServer.cast(__MODULE__, {:failure, service})
error
end
:open ->
{:error, {:circuit_open, service}}
end
end
@impl true
def handle_call({:check, service}, _from, state) do
breaker = Map.get(state, service, %__MODULE__{})
case breaker.state do
:closed ->
{:reply, :ok, state}
:open ->
if System.monotonic_time(:millisecond) - breaker.last_failure > @recovery_timeout do
# Half-open: allow one request through
{:reply, :ok, Map.put(state, service, %{breaker | state: :half_open})}
else
{:reply, :open, state}
end
:half_open ->
{:reply, :ok, state}
end
end
@impl true
def handle_cast({:success, service}, state) do
{:noreply, Map.put(state, service, %__MODULE__{})}
end
@impl true
def handle_cast({:failure, service}, state) do
breaker = Map.get(state, service, %__MODULE__{})
new_count = breaker.failure_count + 1
new_breaker =
if new_count >= @failure_threshold do
%{breaker | state: :open, failure_count: new_count, last_failure: System.monotonic_time(:millisecond)}
else
%{breaker | failure_count: new_count}
end
{:noreply, Map.put(state, service, new_breaker)}
end
end
Usage in the Spotify context:
def fetch_metadata(url) do
with {:ok, %{type: type, id: id}} <- URLParser.parse(url) do
CircuitBreaker.call(:spotify, fn ->
case type do
"track" -> spotify_client().fetch_track(id)
"album" -> spotify_client().fetch_album(id)
"playlist" -> spotify_client().fetch_playlist(id)
end
end)
end
end
Circuit States
stateDiagram-v2
[*] --> CLOSED
CLOSED --> CLOSED : success
CLOSED --> CLOSED : failure (< threshold)
CLOSED --> OPEN : failure (>= threshold)
OPEN --> HALF_OPEN : recovery_timeout expires
HALF_OPEN --> CLOSED : success
HALF_OPEN --> OPEN : failure
Webhook Patterns (Planned)
For future integrations (e.g., notifying external services when processing completes), a webhook dispatch system is planned:
Planned Architecture
defmodule SoundForge.Webhooks do
@moduledoc """
Webhook dispatch for external service notifications.
"""
alias SoundForge.Repo
def dispatch(event, payload) do
# Find all registered webhooks for this event
webhooks = list_webhooks_for_event(event)
Enum.each(webhooks, fn webhook ->
%{
webhook_id: webhook.id,
event: event,
payload: payload,
url: webhook.url
}
|> SoundForge.Jobs.WebhookWorker.new()
|> Oban.insert()
end)
end
end
defmodule SoundForge.Jobs.WebhookWorker do
use Oban.Worker,
queue: :webhooks,
max_attempts: 5
@impl Oban.Worker
def perform(%Oban.Job{args: %{"url" => url, "event" => event, "payload" => payload}}) do
body = Jason.encode!(%{event: event, payload: payload, timestamp: DateTime.utc_now()})
case Req.post(url,
body: body,
headers: [{"Content-Type", "application/json"}],
receive_timeout: 10_000
) do
{:ok, %Req.Response{status: status}} when status in 200..299 ->
:ok
{:ok, %Req.Response{status: status}} ->
{:error, "Webhook returned status #{status}"}
{:error, reason} ->
{:error, inspect(reason)}
end
end
end
Planned Events
| Event | Payload | Use Case |
|---|---|---|
track.downloaded |
%{track_id, file_path, file_size} |
Trigger external processing |
track.processed |
%{track_id, stems: [...]} |
Notify DAW integrations |
track.analyzed |
%{track_id, tempo, key, energy} |
Feed music databases |
job.failed |
%{job_id, worker, error} |
Alert monitoring systems |
Integration Summary
| Integration | Transport | Protocol | Timeout | Error Strategy |
|---|---|---|---|---|
| Spotify API | HTTPS (Req) | REST JSON | Req default (30s) | ETS token cache, error tuples |
| AnalyzerPort | Erlang Port | JSON single-shot | 2 min | Exit status parsing, GenServer reset |
| DemucsPort | Erlang Port | JSON lines (streaming) | 5 min | Exit status parsing, progress callbacks |
| spotdl CLI | System.cmd | CLI exit code + stdout | Oban job timeout | Oban retry (3 attempts) |
| Storage | Local filesystem | File.cp/rm/stat | N/A | Error tuples, idempotent delete |
| Circuit Breaker | Internal GenServer | State machine | 30s recovery | Threshold-based tripping |
| Webhooks | HTTPS (Req) | POST JSON | 10s per request | Oban retry (5 attempts) |