Add MP3 upload pipeline foundation

This commit is contained in:
diyaa 2026-05-28 17:43:00 +02:00
parent 799c07b068
commit 98edfa0faf
24 changed files with 2469 additions and 93 deletions

View File

@ -29,6 +29,7 @@ struct MacLibraryView: View {
)
statusRow(title: "Last heartbeat", value: viewModel.lastHeartbeatStatus)
statusRow(title: "Last bootstrap", value: viewModel.lastSyncBootstrapStatus)
statusRow(title: "Last upload", value: viewModel.lastUploadStatus)
if let lastBootstrapTrackCount = viewModel.lastBootstrapTrackCount {
statusRow(title: "Bootstrap tracks", value: "\(lastBootstrapTrackCount)")
@ -95,7 +96,38 @@ struct MacLibraryView: View {
Text(viewModel.scanStatus)
.foregroundStyle(.secondary)
List {
HStack(spacing: 12) {
Button("Upload Selected Track") {
Task {
await viewModel.uploadSelectedTrack()
}
}
.disabled(
viewModel.selectedTrackID == nil
|| viewModel.registeredDeviceId == nil
|| viewModel.isUploadingAnyTrack
|| viewModel.isUploadingAllTracks
)
Button("Upload All Local Tracks") {
Task {
await viewModel.uploadAllLocalTracks()
}
}
.disabled(
viewModel.tracks.isEmpty
|| viewModel.registeredDeviceId == nil
|| viewModel.isUploadingAnyTrack
|| viewModel.isUploadingAllTracks
)
if viewModel.isUploadingAnyTrack || viewModel.isUploadingAllTracks {
ProgressView()
.controlSize(.small)
}
}
List(selection: $viewModel.selectedTrackID) {
ForEach(viewModel.tracks, id: \.id) { track in
HStack(alignment: .top, spacing: 12) {
Button {
@ -106,9 +138,19 @@ struct MacLibraryView: View {
}
.buttonStyle(.borderless)
VStack(alignment: .leading, spacing: 4) {
Text(track.title)
.font(.headline)
VStack(alignment: .leading, spacing: 6) {
HStack(alignment: .center, spacing: 8) {
Text(track.title)
.font(.headline)
Text(viewModel.uploadStatusLabel(for: track))
.font(.caption)
.foregroundStyle(uploadBadgeColor(for: track))
.padding(.horizontal, 8)
.padding(.vertical, 4)
.background(uploadBadgeColor(for: track).opacity(0.12), in: Capsule())
}
Text(track.artist)
.foregroundStyle(.secondary)
@ -124,6 +166,27 @@ struct MacLibraryView: View {
.foregroundStyle(.tertiary)
}
if let remoteTrackId = track.remoteTrackId, !remoteTrackId.isEmpty {
Text("Remote track ID: \(remoteTrackId)")
.font(.caption2)
.foregroundStyle(.secondary)
.textSelection(.enabled)
}
if let uploadProgress = viewModel.uploadProgress(for: track) {
ProgressView(value: uploadProgress)
.frame(maxWidth: 220)
} else if [.preparing, .uploading].contains(viewModel.uploadStatus(for: track)) {
ProgressView()
.frame(maxWidth: 220, alignment: .leading)
}
if let lastUploadError = track.lastUploadError, !lastUploadError.isEmpty {
Text(lastUploadError)
.font(.caption2)
.foregroundStyle(.red)
}
Text(track.localFilePath)
.font(.caption2)
.foregroundStyle(.tertiary)
@ -142,6 +205,7 @@ struct MacLibraryView: View {
}
}
.padding(.vertical, 4)
.tag(track.id)
}
}
.overlay {
@ -169,6 +233,21 @@ struct MacLibraryView: View {
return String(format: "%d:%02d", minutes, seconds)
}
private func uploadBadgeColor(for track: LibraryTrack) -> Color {
switch viewModel.uploadStatus(for: track) {
case .localOnly:
return .secondary
case .preparing:
return .orange
case .uploading:
return .blue
case .uploaded:
return .green
case .failed:
return .red
}
}
private var displayedPlaybackTime: Double {
scrubbedPlaybackTime ?? viewModel.nowPlayingState.currentTime
}

View File

@ -10,6 +10,7 @@ import VelodyUtilities
@Observable
final class MacLibraryViewModel {
var tracks: [LibraryTrack] = []
var selectedTrackID: String?
var selectedFolderPath = "No folder selected"
var scanStatus = "Choose a folder to begin local discovery."
var discoveredTrackCount = 0
@ -21,23 +22,30 @@ final class MacLibraryViewModel {
var registeredDeviceId: String?
var lastHeartbeatStatus = "No heartbeat sent yet."
var lastSyncBootstrapStatus = "No sync bootstrap run yet."
var lastUploadStatus = "No uploads run yet."
var lastBootstrapTrackCount: Int?
var lastBootstrapCursor: String?
var isRegisteringDevice = false
var isSendingHeartbeat = false
var isRunningSyncBootstrap = false
var isUploadingAllTracks = false
var activeUploadTrackIDs: Set<String> = []
var uploadProgressByTrackID: [String: Double] = [:]
private let folderAccessService: any VelodyPersistence.FolderAccessService
private let catalogService: any LocalCatalogService
private let trackRepository: any TrackRepository
private let localMusicScanner: any LocalMusicScanner
private let playbackController: PlaybackController
private let keychainService: any KeychainService
private let userDefaults: UserDefaults
private let fileManager: FileManager
private var hasLoaded = false
init(
userDefaults: UserDefaults = .standard,
keychainService: any KeychainService = SystemKeychainService(service: "de.diyaa.velody.mac")
keychainService: any KeychainService = SystemKeychainService(service: "de.diyaa.velody.mac"),
fileManager: FileManager = .default
) {
let folderAccessService = FolderAccessService()
let localMusicScanner = FileSystemLocalMusicScanner(
@ -53,10 +61,12 @@ final class MacLibraryViewModel {
self.folderAccessService = folderAccessService
self.catalogService = DefaultLocalCatalogService(repository: repository)
self.trackRepository = repository
self.localMusicScanner = localMusicScanner
self.playbackController = playbackController
self.keychainService = keychainService
self.userDefaults = userDefaults
self.fileManager = fileManager
self.serverURLString = userDefaults.string(forKey: Self.serverURLDefaultsKey)
?? ServerEnvironment.defaultLocalBaseURL.absoluteString
self.nowPlayingState = playbackController.nowPlayingState
@ -71,6 +81,10 @@ final class MacLibraryViewModel {
}
}
var isUploadingAnyTrack: Bool {
!activeUploadTrackIDs.isEmpty
}
func loadIfNeeded() async {
guard !hasLoaded else { return }
hasLoaded = true
@ -82,6 +96,7 @@ final class MacLibraryViewModel {
}
playbackController.setCatalogTracks(tracks)
refreshSelectedTrackIfNeeded()
await restoreDeviceIdentity()
}
@ -91,6 +106,7 @@ final class MacLibraryViewModel {
scanStatus = "Folder selected. Run a manual scan to discover MP3 files."
tracks = []
discoveredTrackCount = 0
selectedTrackID = nil
playbackController.setCatalogTracks([])
Task {
do {
@ -124,6 +140,7 @@ final class MacLibraryViewModel {
tracks = scanResult.tracks
discoveredTrackCount = tracks.count
playbackController.setCatalogTracks(tracks)
refreshSelectedTrackIfNeeded()
scanStatus = Self.scanStatus(
for: scanResult,
activeTrackCount: discoveredTrackCount
@ -202,6 +219,101 @@ final class MacLibraryViewModel {
nowPlayingState.error?.errorDescription
}
func uploadSelectedTrack() async {
guard let selectedTrackID else {
lastUploadStatus = "Select a local track before uploading."
return
}
_ = await uploadTrack(trackID: selectedTrackID)
}
func uploadAllLocalTracks() async {
guard !tracks.isEmpty else {
lastUploadStatus = "Scan a folder before starting uploads."
return
}
isUploadingAllTracks = true
lastUploadStatus = "Uploading all local tracks..."
defer {
isUploadingAllTracks = false
}
var uploadedCount = 0
var failedCount = 0
var skippedDuplicateCount = 0
var uploadedTrackIDsBySHA: [String: String] = [:]
var seenSHA256Values = Set<String>()
for track in tracks {
let sha256: String
do {
sha256 = try await ensureSHA256(forTrackID: track.id)
} catch {
failedCount += 1
lastUploadStatus = "Upload failed for \(track.title): \(error.localizedDescription)"
try? await setTrackUploadState(
trackID: track.id,
status: .failed,
remoteTrackId: track.remoteTrackId,
lastUploadError: error.localizedDescription,
progress: nil
)
continue
}
if !seenSHA256Values.insert(sha256).inserted {
skippedDuplicateCount += 1
if let remoteTrackID = uploadedTrackIDsBySHA[sha256] {
try? await setTrackUploadState(
trackID: track.id,
status: .uploaded,
remoteTrackId: remoteTrackID,
lastUploadError: nil,
progress: 1
)
}
continue
}
switch await uploadTrack(trackID: track.id, knownSHA256: sha256) {
case .success(let remoteTrackID):
uploadedCount += 1
if let remoteTrackID {
uploadedTrackIDsBySHA[sha256] = remoteTrackID
}
case .failure:
failedCount += 1
}
}
lastUploadStatus = "Bulk upload finished. Uploaded: \(uploadedCount). Failed: \(failedCount). Duplicates skipped: \(skippedDuplicateCount)."
}
func uploadStatus(for track: LibraryTrack) -> LocalUploadStatus {
track.uploadStatus ?? .localOnly
}
func uploadStatusLabel(for track: LibraryTrack) -> String {
switch uploadStatus(for: track) {
case .localOnly:
return "Local only"
case .preparing:
return "Preparing"
case .uploading:
return "Uploading"
case .uploaded:
return "Uploaded"
case .failed:
return "Failed"
}
}
func uploadProgress(for track: LibraryTrack) -> Double? {
uploadProgressByTrackID[track.id]
}
func persistServerURLSelection() {
guard let serverURL = Self.normalizedServerURL(from: serverURLString) else {
return
@ -282,6 +394,146 @@ final class MacLibraryViewModel {
}
}
private func uploadTrack(
trackID: String,
knownSHA256: String? = nil
) async -> UploadOutcome {
guard !activeUploadTrackIDs.contains(trackID) else {
return .failure
}
guard let initialTrack = track(for: trackID) else {
lastUploadStatus = "The selected track is no longer in the local catalog."
return .failure
}
activeUploadTrackIDs.insert(trackID)
defer {
activeUploadTrackIDs.remove(trackID)
}
do {
let environment = try currentEnvironment()
let deviceId = try await currentDeviceId()
let fileURL = URL(fileURLWithPath: initialTrack.localFilePath)
try await withStoredFolderAccess {
guard fileManager.fileExists(atPath: fileURL.path) else {
throw UploadPipelineError.localFileMissing
}
}
try await setTrackUploadState(
trackID: trackID,
status: .preparing,
remoteTrackId: initialTrack.remoteTrackId,
lastUploadError: nil,
progress: 0.15
)
let sha256 = try await ensureSHA256(
forTrackID: trackID,
knownValue: knownSHA256
)
let sizeBytes = try await withStoredFolderAccess {
try fileSize(at: fileURL)
}
let apiClient = makeAPIClient(for: environment)
let prepareResponse = try await apiClient.prepareUpload(
UploadPrepareRequest(
deviceId: deviceId,
sha256: sha256,
originalFilename: fileURL.lastPathComponent,
sizeBytes: sizeBytes
)
)
switch prepareResponse.status {
case .exists:
let remoteTrackID = prepareResponse.trackId ?? currentTrack(for: trackID)?.remoteTrackId
try await setTrackUploadState(
trackID: trackID,
status: .uploaded,
remoteTrackId: remoteTrackID,
lastUploadError: nil,
progress: 1
)
lastUploadStatus = remoteTrackID.map {
"Track already exists on the server as \($0)."
} ?? "Track already exists on the server."
return .success(remoteTrackId: remoteTrackID)
case .uploadRequired:
guard let uploadId = prepareResponse.uploadId else {
throw UploadPipelineError.invalidPrepareResponse
}
try await setTrackUploadState(
trackID: trackID,
status: .uploading,
remoteTrackId: currentTrack(for: trackID)?.remoteTrackId,
lastUploadError: nil,
progress: 0.55
)
let uploadResponse = try await withStoredFolderAccess {
try await apiClient.uploadFile(
uploadId: uploadId,
fileURL: fileURL,
mimeType: "audio/mpeg"
)
}
guard uploadResponse.status == .completed else {
throw UploadPipelineError.uploadDidNotComplete(uploadResponse.status.rawValue)
}
try await setTrackUploadState(
trackID: trackID,
status: .uploading,
remoteTrackId: currentTrack(for: trackID)?.remoteTrackId,
lastUploadError: nil,
progress: 0.85
)
guard let track = currentTrack(for: trackID) else {
throw UploadPipelineError.trackMissing
}
let finalizeResponse = try await apiClient.finalizeUpload(
uploadId: uploadId,
payload: UploadFinalizeRequest(
title: track.title,
artist: track.artist,
album: track.album,
durationMs: durationMilliseconds(from: track.durationSeconds)
)
)
try await setTrackUploadState(
trackID: trackID,
status: .uploaded,
remoteTrackId: finalizeResponse.trackId,
lastUploadError: nil,
progress: 1
)
lastUploadStatus = "Uploaded \(track.title) as remote track \(finalizeResponse.trackId)."
return .success(remoteTrackId: finalizeResponse.trackId)
}
} catch {
let remoteTrackID = currentTrack(for: trackID)?.remoteTrackId
try? await setTrackUploadState(
trackID: trackID,
status: .failed,
remoteTrackId: remoteTrackID,
lastUploadError: error.localizedDescription,
progress: nil
)
lastUploadStatus = "Upload failed for \(initialTrack.title): \(error.localizedDescription)"
return .failure
}
}
private func restoreDeviceIdentity() async {
do {
let deviceId = try await keychainService.loadValue(forKey: Self.deviceIdKey)
@ -336,6 +588,145 @@ final class MacLibraryViewModel {
URLSessionVelodyAPIClient(environment: environment)
}
private func currentTrack(for trackID: String) -> LibraryTrack? {
tracks.first(where: { $0.id == trackID })
}
private func track(for trackID: String) -> LibraryTrack? {
tracks.first(where: { $0.id == trackID })
}
private func ensureSHA256(
forTrackID trackID: String,
knownValue: String? = nil
) async throws -> String {
if let knownValue, !knownValue.isEmpty {
return knownValue
}
guard let track = currentTrack(for: trackID) else {
throw UploadPipelineError.trackMissing
}
if let sha256 = track.sha256, !sha256.isEmpty {
return sha256
}
let fileURL = URL(fileURLWithPath: track.localFilePath)
let sha256 = try await withStoredFolderAccess {
try await Task.detached(priority: .utility) {
try SHA256FileHasher().hashFile(at: fileURL)
}.value
}
try await setTrackUploadState(
trackID: trackID,
status: track.uploadStatus ?? .localOnly,
remoteTrackId: track.remoteTrackId,
lastUploadError: track.lastUploadError,
progress: uploadProgressByTrackID[trackID],
sha256: sha256
)
return sha256
}
private func setTrackUploadState(
trackID: String,
status: LocalUploadStatus,
remoteTrackId: String?,
lastUploadError: String?,
progress: Double? = nil,
sha256: String? = nil
) async throws {
guard let index = tracks.firstIndex(where: { $0.id == trackID }) else {
throw UploadPipelineError.trackMissing
}
var updatedTrack = tracks[index]
updatedTrack.uploadStatus = status
updatedTrack.remoteTrackId = remoteTrackId
updatedTrack.lastUploadError = lastUploadError
if let sha256 {
updatedTrack.sha256 = sha256
}
tracks[index] = updatedTrack
playbackController.setCatalogTracks(tracks)
if let progress {
uploadProgressByTrackID[trackID] = progress
} else {
uploadProgressByTrackID.removeValue(forKey: trackID)
}
try await persistTrackState(updatedTrack)
}
private func persistTrackState(_ track: LibraryTrack) async throws {
guard var localTrack = try await trackRepository.findTrack(trackID: track.id) else {
throw UploadPipelineError.trackMissing
}
localTrack.title = track.title
localTrack.artist = track.artist
localTrack.album = track.album
localTrack.durationSeconds = track.durationSeconds
localTrack.localFilePath = track.localFilePath
localTrack.sha256 = track.sha256
localTrack.uploadStatus = track.uploadStatus ?? .localOnly
localTrack.remoteTrackId = track.remoteTrackId
localTrack.lastUploadError = track.lastUploadError
localTrack.updatedAt = Date()
try await trackRepository.saveLocalTrack(localTrack)
}
private func fileSize(at fileURL: URL) throws -> Int {
if let fileSize = try fileURL.resourceValues(forKeys: [.fileSizeKey]).fileSize {
return fileSize
}
let attributes = try fileManager.attributesOfItem(atPath: fileURL.path)
if let fileSize = attributes[.size] as? NSNumber {
return fileSize.intValue
}
throw UploadPipelineError.unreadableLocalFile
}
private func durationMilliseconds(from durationSeconds: Double?) -> Int? {
guard let durationSeconds, durationSeconds > 0 else {
return nil
}
return Int((durationSeconds * 1000).rounded())
}
private func withStoredFolderAccess<T>(
_ operation: () async throws -> T
) async throws -> T {
guard let folderURL = folderAccessService.storedFolderURL() else {
return try await operation()
}
let hasScopedAccess = folderURL.startAccessingSecurityScopedResource()
defer {
if hasScopedAccess {
folderURL.stopAccessingSecurityScopedResource()
}
}
return try await operation()
}
private func refreshSelectedTrackIfNeeded() {
guard let selectedTrackID else { return }
if !tracks.contains(where: { $0.id == selectedTrackID }) {
self.selectedTrackID = nil
}
}
private static func makeTrackRepository() -> any TrackRepository {
if let repository = try? SwiftDataTrackRepository() {
return repository
@ -400,6 +791,11 @@ final class MacLibraryViewModel {
private static let playbackSessionDefaultsKey = "velody.playback.session"
}
private enum UploadOutcome {
case success(remoteTrackId: String?)
case failure
}
private enum BackendConnectionError: LocalizedError {
case invalidServerURL
case missingDeviceIdentity
@ -409,7 +805,30 @@ private enum BackendConnectionError: LocalizedError {
case .invalidServerURL:
return "Enter a valid backend URL, such as http://localhost:3000."
case .missingDeviceIdentity:
return "Register this Mac before sending a heartbeat."
return "Register this Mac before uploading or sending a heartbeat."
}
}
}
private enum UploadPipelineError: LocalizedError {
case localFileMissing
case unreadableLocalFile
case invalidPrepareResponse
case uploadDidNotComplete(String)
case trackMissing
var errorDescription: String? {
switch self {
case .localFileMissing:
return "The local MP3 file could not be found."
case .unreadableLocalFile:
return "The local MP3 file could not be read."
case .invalidPrepareResponse:
return "The backend did not return an upload session."
case let .uploadDidNotComplete(status):
return "The backend reported upload status \(status) instead of COMPLETED."
case .trackMissing:
return "The selected track is no longer available in the local catalog."
}
}
}

View File

@ -145,10 +145,82 @@
]
}
},
"/api/v1/uploads/{uploadId}/file": {
"put": {
"operationId": "UploadsController_uploadFile_v1",
"parameters": [
{
"name": "uploadId",
"required": true,
"in": "path",
"schema": {
"type": "string"
}
}
],
"requestBody": {
"required": true,
"content": {
"audio/mpeg": {
"schema": {
"type": "string",
"format": "binary"
}
},
"audio/mp3": {
"schema": {
"type": "string",
"format": "binary"
}
},
"application/octet-stream": {
"schema": {
"type": "string",
"format": "binary"
}
}
}
},
"responses": {
"200": {
"description": "",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UploadSessionStatusResponseDto"
}
}
}
}
},
"tags": [
"uploads"
]
}
},
"/api/v1/uploads/{uploadId}/finalize": {
"post": {
"operationId": "UploadsController_finalize_v1",
"parameters": [],
"parameters": [
{
"name": "uploadId",
"required": true,
"in": "path",
"schema": {
"type": "string"
}
}
],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UploadFinalizeRequestDto"
}
}
}
},
"responses": {
"200": {
"description": "",
@ -161,7 +233,6 @@
}
}
},
"summary": "Reserved for the next milestone",
"tags": [
"uploads"
]
@ -414,6 +485,14 @@
"nextOffset": {
"type": "number",
"example": 0
},
"trackId": {
"type": "string",
"format": "uuid"
},
"assetId": {
"type": "string",
"format": "uuid"
}
},
"required": [
@ -447,6 +526,10 @@
"nextOffset": {
"type": "string",
"example": 0
},
"finalizedAt": {
"type": "string",
"example": "2026-05-28T12:00:00.000Z"
}
},
"required": [
@ -457,21 +540,46 @@
"nextOffset"
]
},
"UploadFinalizeResponseDto": {
"UploadFinalizeRequestDto": {
"type": "object",
"properties": {
"statusCode": {
"type": "number",
"example": 501
},
"message": {
"title": {
"type": "string",
"example": "Upload finalization is not implemented yet."
"example": "Track Title"
},
"artist": {
"type": "string",
"example": "Track Artist"
},
"album": {
"type": "string",
"example": "Album Title"
},
"durationMs": {
"type": "number",
"example": 245000
}
},
"required": [
"statusCode",
"message"
"title",
"artist"
]
},
"UploadFinalizeResponseDto": {
"type": "object",
"properties": {
"trackId": {
"type": "string",
"format": "uuid"
},
"assetId": {
"type": "string",
"format": "uuid"
}
},
"required": [
"trackId",
"assetId"
]
},
"LibraryTrackDto": {

View File

@ -0,0 +1,135 @@
CREATE TABLE "users" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"slug" TEXT NOT NULL,
"display_name" TEXT NOT NULL,
"is_default" BOOLEAN NOT NULL DEFAULT false,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "users_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX "users_slug_key" ON "users"("slug");
INSERT INTO "users" (
"id",
"slug",
"display_name",
"is_default",
"created_at",
"updated_at"
)
VALUES (
gen_random_uuid(),
'default-owner',
'Default Owner',
true,
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP
)
ON CONFLICT ("slug") DO NOTHING;
ALTER TABLE "artwork_assets"
ALTER COLUMN "id" SET DEFAULT gen_random_uuid();
ALTER TABLE "audio_assets"
ALTER COLUMN "id" SET DEFAULT gen_random_uuid(),
ADD COLUMN "user_id" UUID;
ALTER TABLE "device_sync_cursors"
ALTER COLUMN "updated_at" SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE "devices"
ALTER COLUMN "id" SET DEFAULT gen_random_uuid(),
ALTER COLUMN "updated_at" SET DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "user_id" UUID;
ALTER TABLE "library_events"
ADD COLUMN "user_id" UUID;
ALTER TABLE "tracks"
ALTER COLUMN "id" SET DEFAULT gen_random_uuid(),
ALTER COLUMN "updated_at" SET DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "user_id" UUID;
ALTER TABLE "upload_sessions"
ALTER COLUMN "id" SET DEFAULT gen_random_uuid(),
ALTER COLUMN "updated_at" SET DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "user_id" UUID,
ADD COLUMN "track_id" UUID,
ADD COLUMN "audio_asset_id" UUID,
ADD COLUMN "original_filename" TEXT,
ADD COLUMN "completed_at" TIMESTAMP(3),
ADD COLUMN "finalized_at" TIMESTAMP(3);
UPDATE "devices"
SET "user_id" = (SELECT "id" FROM "users" WHERE "slug" = 'default-owner')
WHERE "user_id" IS NULL;
UPDATE "tracks"
SET "user_id" = (SELECT "id" FROM "users" WHERE "slug" = 'default-owner')
WHERE "user_id" IS NULL;
UPDATE "audio_assets"
SET "user_id" = (SELECT "id" FROM "users" WHERE "slug" = 'default-owner')
WHERE "user_id" IS NULL;
UPDATE "upload_sessions"
SET
"user_id" = (SELECT "id" FROM "users" WHERE "slug" = 'default-owner'),
"original_filename" = COALESCE("original_filename", "expected_sha256" || '.mp3')
WHERE "user_id" IS NULL
OR "original_filename" IS NULL;
UPDATE "library_events"
SET "user_id" = (SELECT "id" FROM "users" WHERE "slug" = 'default-owner')
WHERE "user_id" IS NULL;
ALTER TABLE "audio_assets"
ALTER COLUMN "user_id" SET NOT NULL;
ALTER TABLE "devices"
ALTER COLUMN "user_id" SET NOT NULL;
ALTER TABLE "library_events"
ALTER COLUMN "user_id" SET NOT NULL;
ALTER TABLE "tracks"
ALTER COLUMN "user_id" SET NOT NULL;
ALTER TABLE "upload_sessions"
ALTER COLUMN "user_id" SET NOT NULL,
ALTER COLUMN "original_filename" SET NOT NULL;
DROP INDEX "audio_assets_sha256_key";
CREATE UNIQUE INDEX "audio_assets_user_id_sha256_key" ON "audio_assets"("user_id", "sha256");
CREATE INDEX "audio_assets_user_id_idx" ON "audio_assets"("user_id");
CREATE INDEX "devices_user_id_idx" ON "devices"("user_id");
CREATE INDEX "library_events_user_id_idx" ON "library_events"("user_id");
CREATE INDEX "tracks_user_id_idx" ON "tracks"("user_id");
CREATE INDEX "upload_sessions_user_id_idx" ON "upload_sessions"("user_id");
ALTER TABLE "audio_assets"
ADD CONSTRAINT "audio_assets_user_id_fkey"
FOREIGN KEY ("user_id") REFERENCES "users"("id")
ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE "devices"
ADD CONSTRAINT "devices_user_id_fkey"
FOREIGN KEY ("user_id") REFERENCES "users"("id")
ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE "library_events"
ADD CONSTRAINT "library_events_user_id_fkey"
FOREIGN KEY ("user_id") REFERENCES "users"("id")
ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE "tracks"
ADD CONSTRAINT "tracks_user_id_fkey"
FOREIGN KEY ("user_id") REFERENCES "users"("id")
ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE "upload_sessions"
ADD CONSTRAINT "upload_sessions_user_id_fkey"
FOREIGN KEY ("user_id") REFERENCES "users"("id")
ON DELETE CASCADE ON UPDATE CASCADE;

View File

@ -7,8 +7,25 @@ datasource db {
url = env("DATABASE_URL")
}
model User {
id String @id @default(uuid()) @db.Uuid
slug String @unique
displayName String @map("display_name")
isDefault Boolean @default(false) @map("is_default")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
devices Device[]
tracks Track[]
audioAssets AudioAsset[]
uploadSessions UploadSession[]
libraryEvents LibraryEvent[]
@@map("users")
}
model Device {
id String @id @default(uuid()) @db.Uuid
userId String @db.Uuid @map("user_id")
platform DevicePlatform
deviceName String @map("device_name")
appVersion String @map("app_version")
@ -19,12 +36,15 @@ model Device {
uploadSessions UploadSession[]
syncCursor DeviceSyncCursor?
audioAssets AudioAsset[]
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@index([userId])
@@map("devices")
}
model Track {
id String @id @default(uuid()) @db.Uuid
userId String @db.Uuid @map("user_id")
primaryAudioAssetId String? @unique @db.Uuid @map("primary_audio_asset_id")
artworkAssetId String? @unique @db.Uuid @map("artwork_asset_id")
title String
@ -43,14 +63,17 @@ model Track {
primaryAudioAsset AudioAsset? @relation("PrimaryAudioAsset", fields: [primaryAudioAssetId], references: [id], onDelete: SetNull, onUpdate: Cascade)
artworkAsset ArtworkAsset? @relation("TrackArtwork", fields: [artworkAssetId], references: [id], onDelete: SetNull, onUpdate: Cascade)
audioAssets AudioAsset[] @relation("TrackAudioAssets")
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@index([userId])
@@map("tracks")
}
model AudioAsset {
id String @id @default(uuid()) @db.Uuid
userId String @db.Uuid @map("user_id")
trackId String? @db.Uuid @map("track_id")
sha256 String @unique
sha256 String
storageKey String @unique @map("storage_key")
originalFilename String @map("original_filename")
mimeType String @map("mime_type")
@ -65,7 +88,10 @@ model AudioAsset {
track Track? @relation("TrackAudioAssets", fields: [trackId], references: [id], onDelete: SetNull, onUpdate: Cascade)
primaryForTrack Track? @relation("PrimaryAudioAsset")
sourceDevice Device? @relation(fields: [sourceDeviceId], references: [id], onDelete: SetNull, onUpdate: Cascade)
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@unique([userId, sha256])
@@index([userId])
@@map("audio_assets")
}
@ -85,28 +111,39 @@ model ArtworkAsset {
model UploadSession {
id String @id @default(uuid()) @db.Uuid
userId String @db.Uuid @map("user_id")
deviceId String @db.Uuid @map("device_id")
trackId String? @db.Uuid @map("track_id")
audioAssetId String? @db.Uuid @map("audio_asset_id")
expectedSha256 String @map("expected_sha256")
originalFilename String @map("original_filename")
expectedSizeBytes BigInt @map("expected_size_bytes")
receivedBytes BigInt @default(0) @map("received_bytes")
tempStoragePath String @map("temp_storage_path")
status UploadSessionStatus @default(PENDING)
completedAt DateTime? @map("completed_at")
finalizedAt DateTime? @map("finalized_at")
expiresAt DateTime @map("expires_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
device Device @relation(fields: [deviceId], references: [id], onDelete: Cascade, onUpdate: Cascade)
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@index([userId])
@@map("upload_sessions")
}
model LibraryEvent {
id BigInt @id @default(autoincrement())
userId String @db.Uuid @map("user_id")
entityType EntityType @map("entity_type")
entityId String @db.Uuid @map("entity_id")
action EventAction
payloadVersion Int @default(1) @map("payload_version")
createdAt DateTime @default(now()) @map("created_at")
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@index([userId])
@@map("library_events")
}

View File

@ -2,9 +2,10 @@ import { Module } from '@nestjs/common';
import { PrismaModule } from '../../infrastructure/database/prisma.module';
import { DevicesController } from './devices.controller';
import { DevicesService } from './devices.service';
import { UsersModule } from '../users/users.module';
@Module({
imports: [PrismaModule],
imports: [PrismaModule, UsersModule],
controllers: [DevicesController],
providers: [DevicesService],
exports: [DevicesService],

View File

@ -1,6 +1,7 @@
import { Injectable, NotFoundException } from '@nestjs/common';
import { createHash, randomBytes } from 'node:crypto';
import { PrismaService } from '../../infrastructure/database/prisma.service';
import { DefaultUserService } from '../users/default-user.service';
import {
DeviceHeartbeatRequestDto,
DeviceHeartbeatResponseDto,
@ -10,7 +11,10 @@ import {
@Injectable()
export class DevicesService {
constructor(private readonly prismaService: PrismaService) {}
constructor(
private readonly prismaService: PrismaService,
private readonly defaultUserService: DefaultUserService,
) {}
async register(
body: RegisterDeviceRequestDto,
@ -19,9 +23,11 @@ export class DevicesService {
const installTokenHash = createHash('sha256')
.update(bootstrapToken)
.digest('hex');
const defaultUser = await this.defaultUserService.getOrCreateDefaultUser();
const device = await this.prismaService.device.create({
data: {
userId: defaultUser.id,
platform: body.platform,
deviceName: body.deviceName,
appVersion: body.appVersion,

View File

@ -1,7 +1,10 @@
import { Module } from '@nestjs/common';
import { PrismaModule } from '../../infrastructure/database/prisma.module';
import { UsersModule } from '../users/users.module';
import { LibraryService } from './library.service';
@Module({
imports: [PrismaModule, UsersModule],
providers: [LibraryService],
exports: [LibraryService],
})

View File

@ -1,9 +1,36 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/database/prisma.service';
import { LibraryTrackDto } from '../sync/sync.dto';
import { DefaultUserService } from '../users/default-user.service';
@Injectable()
export class LibraryService {
constructor(
private readonly prismaService: PrismaService,
private readonly defaultUserService: DefaultUserService,
) {}
async getBootstrapTracks(): Promise<LibraryTrackDto[]> {
return [];
const defaultUser = await this.defaultUserService.getOrCreateDefaultUser();
const tracks = await this.prismaService.track.findMany({
where: {
userId: defaultUser.id,
status: 'ACTIVE',
},
orderBy: {
createdAt: 'asc',
},
select: {
id: true,
title: true,
artist: true,
},
});
return tracks.map((track) => ({
id: track.id,
title: track.title,
artist: track.artist,
}));
}
}

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { mkdir, access } from 'node:fs/promises';
import { constants } from 'node:fs';
import { join } from 'node:path';
import { dirname, join } from 'node:path';
import { AppConfigService } from '../config/config.service';
export interface StorageStatus {
@ -17,14 +17,44 @@ export class LocalFilesystemStorageService {
return this.configService.storageRoot;
}
resolve(relativePath: string): string {
return join(this.root, relativePath);
}
userAudioAssetStorageKey(userId: string, sha256: string): string {
return join('users', userId, 'audio', `${sha256}.mp3`);
}
userAudioAssetPath(userId: string, sha256: string): string {
return this.resolve(this.userAudioAssetStorageKey(userId, sha256));
}
tempUploadStorageKey(uploadId: string): string {
return join('temp', 'uploads', `${uploadId}.part`);
}
tempUploadPath(uploadId: string): string {
return this.resolve(this.tempUploadStorageKey(uploadId));
}
async ensureDirectory(path: string): Promise<void> {
await mkdir(path, { recursive: true });
}
async ensureParentDirectory(path: string): Promise<void> {
await this.ensureDirectory(dirname(path));
}
async checkReadiness(): Promise<StorageStatus> {
const paths = [
this.root,
join(this.root, 'users'),
join(this.root, 'temp'),
join(this.root, 'temp', 'uploads'),
join(this.root, 'incoming'),
join(this.root, 'quarantine'),
join(this.root, 'library', 'audio'),
join(this.root, 'library', 'artwork'),
join(this.root, 'temp'),
];
for (const path of paths) {

View File

@ -1,11 +1,12 @@
import { Module } from '@nestjs/common';
import { PrismaModule } from '../../infrastructure/database/prisma.module';
import { LibraryModule } from '../library/library.module';
import { UsersModule } from '../users/users.module';
import { SyncController } from './sync.controller';
import { SyncService } from './sync.service';
@Module({
imports: [PrismaModule, LibraryModule],
imports: [PrismaModule, LibraryModule, UsersModule],
controllers: [SyncController],
providers: [SyncService],
})

View File

@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/database/prisma.service';
import { LibraryService } from '../library/library.service';
import { DefaultUserService } from '../users/default-user.service';
import { SyncBootstrapResponseDto, SyncChangesResponseDto } from './sync.dto';
@Injectable()
@ -8,6 +9,7 @@ export class SyncService {
constructor(
private readonly prismaService: PrismaService,
private readonly libraryService: LibraryService,
private readonly defaultUserService: DefaultUserService,
) {}
async bootstrap(): Promise<SyncBootstrapResponseDto> {
@ -37,7 +39,11 @@ export class SyncService {
}
private async getLatestCursor(): Promise<string> {
const defaultUser = await this.defaultUserService.getOrCreateDefaultUser();
const latest = await this.prismaService.libraryEvent.findFirst({
where: {
userId: defaultUser.id,
},
orderBy: {
id: 'desc',
},

View File

@ -2,17 +2,21 @@ import {
Body,
Controller,
Get,
NotImplementedException,
Param,
Post,
Put,
Req,
} from '@nestjs/common';
import type { Request } from 'express';
import {
ApiBody,
ApiCreatedResponse,
ApiConsumes,
ApiOkResponse,
ApiOperation,
ApiTags,
} from '@nestjs/swagger';
import {
UploadFinalizeRequestDto,
UploadFinalizeResponseDto,
UploadPrepareRequestDto,
UploadPrepareResponseDto,
@ -44,10 +48,28 @@ export class UploadsController {
return this.uploadsService.getStatus(uploadId);
}
@Put(':uploadId/file')
@ApiConsumes('audio/mpeg', 'audio/mp3', 'application/octet-stream')
@ApiBody({
schema: {
type: 'string',
format: 'binary',
},
})
@ApiOkResponse({ type: UploadSessionStatusResponseDto })
async uploadFile(
@Param('uploadId') uploadId: string,
@Req() request: Request,
): Promise<UploadSessionStatusResponseDto> {
return this.uploadsService.uploadFile(uploadId, request);
}
@Post(':uploadId/finalize')
@ApiOperation({ summary: 'Reserved for the next milestone' })
@ApiOkResponse({ type: UploadFinalizeResponseDto })
async finalize(): Promise<UploadFinalizeResponseDto> {
throw new NotImplementedException('Upload finalization is not implemented yet.');
async finalize(
@Param('uploadId') uploadId: string,
@Body() body: UploadFinalizeRequestDto,
): Promise<UploadFinalizeResponseDto> {
return this.uploadsService.finalize(uploadId, body);
}
}

View File

@ -1,6 +1,15 @@
import { ApiProperty } from '@nestjs/swagger';
import { UploadSessionStatus } from '@prisma/client';
import { IsInt, IsString, IsUUID, Matches, Max, Min } from 'class-validator';
import {
IsInt,
IsOptional,
IsString,
IsUUID,
Matches,
Max,
Min,
MinLength,
} from 'class-validator';
export class UploadPrepareRequestDto {
@ApiProperty({ format: 'uuid' })
@ -16,6 +25,7 @@ export class UploadPrepareRequestDto {
@ApiProperty({ example: 'track.mp3' })
@IsString()
@Matches(/\.mp3$/i)
originalFilename!: string;
@ApiProperty({ example: 10485760 })
@ -34,6 +44,12 @@ export class UploadPrepareResponseDto {
@ApiProperty({ required: false, example: 0 })
nextOffset?: number;
@ApiProperty({ required: false, format: 'uuid' })
trackId?: string;
@ApiProperty({ required: false, format: 'uuid' })
assetId?: string;
}
export class UploadSessionStatusResponseDto {
@ -51,12 +67,39 @@ export class UploadSessionStatusResponseDto {
@ApiProperty({ example: 0 })
nextOffset!: string;
@ApiProperty({ required: false, example: '2026-05-28T12:00:00.000Z' })
finalizedAt?: string;
}
export class UploadFinalizeRequestDto {
@ApiProperty({ example: 'Track Title' })
@IsString()
@MinLength(1)
title!: string;
@ApiProperty({ example: 'Track Artist' })
@IsString()
@MinLength(1)
artist!: string;
@ApiProperty({ required: false, example: 'Album Title' })
@IsOptional()
@IsString()
album?: string;
@ApiProperty({ required: false, example: 245000 })
@IsOptional()
@IsInt()
@Min(1)
@Max(Number.MAX_SAFE_INTEGER)
durationMs?: number;
}
export class UploadFinalizeResponseDto {
@ApiProperty({ example: 501 })
statusCode!: number;
@ApiProperty({ format: 'uuid' })
trackId!: string;
@ApiProperty({ example: 'Upload finalization is not implemented yet.' })
message!: string;
@ApiProperty({ format: 'uuid' })
assetId!: string;
}

View File

@ -1,10 +1,12 @@
import { Module } from '@nestjs/common';
import { PrismaModule } from '../../infrastructure/database/prisma.module';
import { StorageModule } from '../storage/storage.module';
import { UsersModule } from '../users/users.module';
import { UploadsController } from './uploads.controller';
import { UploadsService } from './uploads.service';
@Module({
imports: [PrismaModule],
imports: [PrismaModule, StorageModule, UsersModule],
controllers: [UploadsController],
providers: [UploadsService],
})

View File

@ -0,0 +1,405 @@
import { randomUUID, createHash } from 'node:crypto';
import { readFile, mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { Readable } from 'node:stream';
import { UnprocessableEntityException } from '@nestjs/common';
import { UploadSessionStatus } from '@prisma/client';
import { AppConfigService } from '../config/config.service';
import { LocalFilesystemStorageService } from '../storage/storage.service';
import { UploadsService } from './uploads.service';
type MockState = ReturnType<typeof createPrismaMock>['state'];
function createPrismaMock() {
const users = new Map<string, any>();
const devices = new Map<string, any>();
const tracks = new Map<string, any>();
const audioAssets = new Map<string, any>();
const uploadSessions = new Map<string, any>();
const libraryEvents = new Map<bigint, any>();
let nextLibraryEventId = 1n;
const defaultUser = {
id: randomUUID(),
slug: 'default-owner',
displayName: 'Default Owner',
isDefault: true,
createdAt: new Date(),
updatedAt: new Date(),
};
users.set(defaultUser.id, defaultUser);
const prismaMock: any = {
$queryRawUnsafe: jest.fn().mockResolvedValue([{ '?column?': 1 }]),
$transaction: jest.fn().mockImplementation(async (callback: any) => callback(prismaMock)),
user: {
upsert: jest.fn().mockResolvedValue(defaultUser),
},
device: {
findUnique: jest.fn().mockImplementation(async ({ where }) => {
return devices.get(where.id) ?? null;
}),
create: jest.fn().mockImplementation(async ({ data }) => {
const record = {
id: data.id ?? randomUUID(),
createdAt: new Date(),
updatedAt: new Date(),
...data,
};
devices.set(record.id, record);
return record;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = devices.get(where.id);
const updated = {
...current,
...data,
updatedAt: new Date(),
};
devices.set(where.id, updated);
return updated;
}),
},
track: {
findUnique: jest.fn().mockImplementation(async ({ where }) => {
return tracks.get(where.id) ?? null;
}),
findMany: jest.fn().mockImplementation(async ({ where }) => {
return [...tracks.values()]
.filter((track) => {
const userMatches = where?.userId ? track.userId === where.userId : true;
const statusMatches = where?.status ? track.status === where.status : true;
return userMatches && statusMatches;
})
.sort((lhs, rhs) => lhs.createdAt.getTime() - rhs.createdAt.getTime());
}),
create: jest.fn().mockImplementation(async ({ data }) => {
const now = new Date();
const record = {
id: randomUUID(),
primaryAudioAssetId: null,
artworkAssetId: null,
albumArtist: null,
genre: null,
discNumber: null,
trackNumber: null,
year: null,
deletedAt: null,
createdAt: now,
updatedAt: now,
...data,
};
tracks.set(record.id, record);
return record;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = tracks.get(where.id);
const updated = {
...current,
...data,
updatedAt: new Date(),
};
tracks.set(where.id, updated);
return updated;
}),
},
audioAsset: {
findUnique: jest.fn().mockImplementation(async ({ where }) => {
if (where.id) {
return audioAssets.get(where.id) ?? null;
}
const composite = where.userId_sha256;
if (!composite) {
return null;
}
return (
[...audioAssets.values()].find(
(asset) =>
asset.userId === composite.userId &&
asset.sha256 === composite.sha256,
) ?? null
);
}),
create: jest.fn().mockImplementation(async ({ data }) => {
const record = {
id: randomUUID(),
bitRateKbps: null,
sampleRateHz: null,
channels: null,
createdAt: new Date(),
...data,
};
audioAssets.set(record.id, record);
return record;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = audioAssets.get(where.id);
const updated = {
...current,
...data,
};
audioAssets.set(where.id, updated);
return updated;
}),
},
uploadSession: {
create: jest.fn().mockImplementation(async ({ data }) => {
const now = new Date();
const record = {
createdAt: now,
updatedAt: now,
completedAt: null,
finalizedAt: null,
trackId: null,
audioAssetId: null,
...data,
};
uploadSessions.set(record.id, record);
return record;
}),
findUnique: jest.fn().mockImplementation(async ({ where }) => {
return uploadSessions.get(where.id) ?? null;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = uploadSessions.get(where.id);
const updated = {
...current,
...data,
updatedAt: new Date(),
};
uploadSessions.set(where.id, updated);
return updated;
}),
},
libraryEvent: {
create: jest.fn().mockImplementation(async ({ data }) => {
const record = {
id: nextLibraryEventId,
payloadVersion: 1,
createdAt: new Date(),
...data,
};
libraryEvents.set(record.id, record);
nextLibraryEventId += 1n;
return record;
}),
findFirst: jest.fn().mockImplementation(async ({ where }) => {
const filteredEvents = [...libraryEvents.values()].filter((event) =>
where?.userId ? event.userId === where.userId : true,
);
return filteredEvents.sort((lhs, rhs) => Number(rhs.id - lhs.id))[0] ?? null;
}),
},
state: {
defaultUser,
users,
devices,
tracks,
audioAssets,
uploadSessions,
libraryEvents,
},
};
return {
prismaMock,
state: prismaMock.state,
};
}
function createAppConfig(storageRoot: string): AppConfigService {
return {
maxUploadSizeBytes: 10 * 1024 * 1024,
storageRoot,
} as AppConfigService;
}
function createUploadRequest(data: Buffer): any {
const request = Readable.from([data]) as any;
request.headers = {
'content-type': 'audio/mpeg',
'content-length': String(data.length),
};
return request;
}
function sampleMp3Bytes(seed: string): Buffer {
return Buffer.concat([
Buffer.from('ID3', 'ascii'),
Buffer.from([0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x21]),
Buffer.from(seed, 'utf8'),
]);
}
function sha256Hex(data: Buffer): string {
return createHash('sha256').update(data).digest('hex');
}
describe('UploadsService', () => {
let prismaMock: any;
let state: MockState;
let storageRoot: string;
let storageService: LocalFilesystemStorageService;
let service: UploadsService;
beforeEach(async () => {
const mock = createPrismaMock();
prismaMock = mock.prismaMock;
state = mock.state;
storageRoot = await mkdtemp(join(tmpdir(), 'velody-upload-spec-'));
storageService = new LocalFilesystemStorageService(createAppConfig(storageRoot));
service = new UploadsService(
prismaMock,
createAppConfig(storageRoot),
storageService,
);
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
function seedDevice() {
const deviceId = randomUUID();
const device = {
id: deviceId,
userId: state.defaultUser.id,
platform: 'MACOS',
deviceName: 'Velody Mac',
appVersion: '0.1.0',
installTokenHash: sha256Hex(Buffer.from(deviceId, 'utf8')),
lastSeenAt: new Date(),
createdAt: new Date(),
updatedAt: new Date(),
};
state.devices.set(deviceId, device);
return device;
}
it('returns upload_required for a new sha during prepare', async () => {
const device = seedDevice();
const bytes = sampleMp3Bytes('prepare-new-sha');
const response = await service.prepare({
deviceId: device.id,
sha256: sha256Hex(bytes),
originalFilename: 'track.mp3',
sizeBytes: bytes.length,
});
expect(response.status).toBe('upload_required');
expect(response.uploadId).toBeDefined();
expect(response.nextOffset).toBe(0);
expect(state.uploadSessions.size).toBe(1);
});
it('rejects an upload whose bytes do not match the prepared sha', async () => {
const device = seedDevice();
const uploadedBytes = sampleMp3Bytes('wrong-sha-upload');
const response = await service.prepare({
deviceId: device.id,
sha256: sha256Hex(sampleMp3Bytes('different-bytes')),
originalFilename: 'mismatch.mp3',
sizeBytes: uploadedBytes.length,
});
await expect(
service.uploadFile(response.uploadId!, createUploadRequest(uploadedBytes)),
).rejects.toBeInstanceOf(UnprocessableEntityException);
const session = state.uploadSessions.get(response.uploadId!);
expect(session.status).toBe(UploadSessionStatus.FAILED);
});
it('stores valid MP3 bytes under the deterministic user storage path', async () => {
const device = seedDevice();
const uploadedBytes = sampleMp3Bytes('valid-upload');
const sha256 = sha256Hex(uploadedBytes);
const response = await service.prepare({
deviceId: device.id,
sha256,
originalFilename: 'valid-upload.mp3',
sizeBytes: uploadedBytes.length,
});
const status = await service.uploadFile(
response.uploadId!,
createUploadRequest(uploadedBytes),
);
expect(status.status).toBe(UploadSessionStatus.COMPLETED);
const storedBytes = await readFile(
join(storageRoot, 'users', state.defaultUser.id, 'audio', `${sha256}.mp3`),
);
expect(storedBytes.equals(uploadedBytes)).toBe(true);
});
it('finalize creates track, audio_asset, and library_event records', async () => {
const device = seedDevice();
const uploadedBytes = sampleMp3Bytes('finalize-records');
const sha256 = sha256Hex(uploadedBytes);
const response = await service.prepare({
deviceId: device.id,
sha256,
originalFilename: 'finalize-records.mp3',
sizeBytes: uploadedBytes.length,
});
await service.uploadFile(response.uploadId!, createUploadRequest(uploadedBytes));
const finalizeResponse = await service.finalize(response.uploadId!, {
title: 'Finalize Track',
artist: 'Velody',
album: 'Milestone 6',
durationMs: 245000,
});
expect(finalizeResponse.trackId).toBeDefined();
expect(finalizeResponse.assetId).toBeDefined();
expect(state.tracks.size).toBe(1);
expect(state.audioAssets.size).toBe(1);
expect(state.libraryEvents.size).toBe(1);
const session = state.uploadSessions.get(response.uploadId!);
expect(session.finalizedAt).toBeInstanceOf(Date);
expect(session.trackId).toBe(finalizeResponse.trackId);
expect(session.audioAssetId).toBe(finalizeResponse.assetId);
});
it('returns exists from prepare after a successful upload and finalize', async () => {
const device = seedDevice();
const uploadedBytes = sampleMp3Bytes('duplicate-handling');
const sha256 = sha256Hex(uploadedBytes);
const firstPrepare = await service.prepare({
deviceId: device.id,
sha256,
originalFilename: 'duplicate-handling.mp3',
sizeBytes: uploadedBytes.length,
});
await service.uploadFile(
firstPrepare.uploadId!,
createUploadRequest(uploadedBytes),
);
const finalizeResponse = await service.finalize(firstPrepare.uploadId!, {
title: 'Duplicate Track',
artist: 'Velody',
album: 'Milestone 6',
durationMs: 123000,
});
const secondPrepare = await service.prepare({
deviceId: device.id,
sha256,
originalFilename: 'duplicate-handling.mp3',
sizeBytes: uploadedBytes.length,
});
expect(secondPrepare.status).toBe('exists');
expect(secondPrepare.trackId).toBe(finalizeResponse.trackId);
expect(secondPrepare.assetId).toBe(finalizeResponse.assetId);
});
});

View File

@ -3,25 +3,41 @@ import {
NotFoundException,
UnprocessableEntityException,
} from '@nestjs/common';
import { join } from 'node:path';
import {
EntityType,
EventAction,
type UploadSession,
UploadSessionStatus,
} from '@prisma/client';
import type { Request } from 'express';
import { createHash, randomUUID } from 'node:crypto';
import { constants } from 'node:fs';
import { access, open, rename, unlink } from 'node:fs/promises';
import { extname } from 'node:path';
import { PrismaService } from '../../infrastructure/database/prisma.service';
import { AppConfigService } from '../config/config.service';
import { UploadPrepareRequestDto, UploadPrepareResponseDto, UploadSessionStatusResponseDto } from './uploads.dto';
import { UploadSessionStatus } from '@prisma/client';
import { LocalFilesystemStorageService } from '../storage/storage.service';
import {
UploadFinalizeRequestDto,
UploadFinalizeResponseDto,
UploadPrepareRequestDto,
UploadPrepareResponseDto,
UploadSessionStatusResponseDto,
} from './uploads.dto';
@Injectable()
export class UploadsService {
constructor(
private readonly prismaService: PrismaService,
private readonly configService: AppConfigService,
private readonly storageService: LocalFilesystemStorageService,
) {}
async prepare(
body: UploadPrepareRequestDto,
): Promise<UploadPrepareResponseDto> {
if (body.sizeBytes > this.configService.maxUploadSizeBytes) {
throw new UnprocessableEntityException('Upload exceeds the configured maximum size.');
}
this.assertFileSizeWithinLimit(body.sizeBytes);
this.assertMp3Filename(body.originalFilename);
const device = await this.prismaService.device.findUnique({
where: { id: body.deviceId },
@ -32,22 +48,33 @@ export class UploadsService {
}
const existingAsset = await this.prismaService.audioAsset.findUnique({
where: { sha256: body.sha256 },
where: {
userId_sha256: {
userId: device.userId,
sha256: body.sha256,
},
},
});
if (existingAsset) {
return {
status: 'exists',
trackId: existingAsset.trackId ?? undefined,
assetId: existingAsset.id,
};
}
const uploadId = randomUUID();
const uploadSession = await this.prismaService.uploadSession.create({
data: {
id: uploadId,
userId: device.userId,
deviceId: body.deviceId,
expectedSha256: body.sha256,
originalFilename: body.originalFilename,
expectedSizeBytes: BigInt(body.sizeBytes),
receivedBytes: BigInt(0),
tempStoragePath: join('incoming', `${body.sha256}.part`),
tempStoragePath: this.storageService.tempUploadStorageKey(uploadId),
status: UploadSessionStatus.READY_TO_UPLOAD,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000),
},
@ -61,6 +88,306 @@ export class UploadsService {
}
async getStatus(uploadId: string): Promise<UploadSessionStatusResponseDto> {
return this.toStatusResponse(await this.getUploadSessionOrThrow(uploadId));
}
async uploadFile(
uploadId: string,
request: Request,
): Promise<UploadSessionStatusResponseDto> {
const uploadSession = await this.getUploadSessionOrThrow(uploadId);
if (uploadSession.status === UploadSessionStatus.COMPLETED) {
return this.toStatusResponse(uploadSession);
}
this.assertUploadSessionReadyForBinaryTransfer(uploadSession);
this.assertAllowedMimeType(request.headers['content-type']);
const contentLength = this.parseContentLength(
request.headers['content-length'],
);
if (contentLength !== undefined) {
this.assertFileSizeWithinLimit(contentLength);
if (contentLength !== Number(uploadSession.expectedSizeBytes)) {
throw new UnprocessableEntityException(
'Upload size does not match the prepared session.',
);
}
}
const tempPath = this.storageService.resolve(uploadSession.tempStoragePath);
const finalPath = this.storageService.userAudioAssetPath(
uploadSession.userId,
uploadSession.expectedSha256,
);
await this.storageService.ensureParentDirectory(tempPath);
await this.storageService.ensureParentDirectory(finalPath);
await this.safeUnlink(tempPath);
const fileHandle = await open(tempPath, 'w');
const hasher = createHash('sha256');
const expectedSizeBytes = Number(uploadSession.expectedSizeBytes);
let receivedBytes = 0;
let sniffedBytes = Buffer.alloc(0);
try {
for await (const chunkValue of request) {
const chunk = Buffer.isBuffer(chunkValue)
? chunkValue
: Buffer.from(chunkValue);
receivedBytes += chunk.byteLength;
if (receivedBytes > expectedSizeBytes) {
throw new UnprocessableEntityException(
'Upload size exceeds the prepared session size.',
);
}
if (sniffedBytes.length < 4) {
sniffedBytes = Buffer.concat([
sniffedBytes,
chunk.subarray(0, Math.max(0, 4 - sniffedBytes.length)),
]).subarray(0, 4);
}
hasher.update(chunk);
await fileHandle.write(chunk);
}
} catch (error) {
await fileHandle.close();
await this.markUploadFailed(uploadId, receivedBytes);
await this.safeUnlink(tempPath);
throw error;
}
await fileHandle.close();
if (receivedBytes !== expectedSizeBytes) {
await this.markUploadFailed(uploadId, receivedBytes);
await this.safeUnlink(tempPath);
throw new UnprocessableEntityException(
'Uploaded bytes do not match the prepared session size.',
);
}
if (!this.looksLikeMp3(sniffedBytes)) {
await this.markUploadFailed(uploadId, receivedBytes);
await this.safeUnlink(tempPath);
throw new UnprocessableEntityException('Uploaded file is not a valid MP3.');
}
const actualSha256 = hasher.digest('hex');
if (actualSha256 !== uploadSession.expectedSha256) {
await this.markUploadFailed(uploadId, receivedBytes);
await this.safeUnlink(tempPath);
throw new UnprocessableEntityException(
'Uploaded file hash does not match the prepared session hash.',
);
}
if (await this.fileExists(finalPath)) {
await this.safeUnlink(tempPath);
} else {
await rename(tempPath, finalPath);
}
const updatedSession = await this.prismaService.uploadSession.update({
where: { id: uploadId },
data: {
receivedBytes: BigInt(receivedBytes),
status: UploadSessionStatus.COMPLETED,
completedAt: new Date(),
},
});
return this.toStatusResponse(updatedSession);
}
async finalize(
uploadId: string,
body: UploadFinalizeRequestDto,
): Promise<UploadFinalizeResponseDto> {
const uploadSession = await this.getUploadSessionOrThrow(uploadId);
if (
uploadSession.finalizedAt &&
uploadSession.trackId &&
uploadSession.audioAssetId
) {
return {
trackId: uploadSession.trackId,
assetId: uploadSession.audioAssetId,
};
}
if (uploadSession.status !== UploadSessionStatus.COMPLETED) {
throw new UnprocessableEntityException(
'Upload must complete before it can be finalized.',
);
}
const finalStorageKey = this.storageService.userAudioAssetStorageKey(
uploadSession.userId,
uploadSession.expectedSha256,
);
const finalPath = this.storageService.resolve(finalStorageKey);
if (!(await this.fileExists(finalPath))) {
throw new UnprocessableEntityException(
'Uploaded file is missing from storage and cannot be finalized.',
);
}
const title = body.title.trim();
const artist = body.artist.trim();
const album = this.trimOptional(body.album);
if (!title || !artist) {
throw new UnprocessableEntityException(
'Track title and artist are required to finalize an upload.',
);
}
return this.prismaService.$transaction(async (tx) => {
const currentSession = await tx.uploadSession.findUnique({
where: { id: uploadId },
});
if (!currentSession) {
throw new NotFoundException('Upload session not found');
}
if (
currentSession.finalizedAt &&
currentSession.trackId &&
currentSession.audioAssetId
) {
return {
trackId: currentSession.trackId,
assetId: currentSession.audioAssetId,
};
}
if (currentSession.status !== UploadSessionStatus.COMPLETED) {
throw new UnprocessableEntityException(
'Upload must complete before it can be finalized.',
);
}
let audioAsset = await tx.audioAsset.findUnique({
where: {
userId_sha256: {
userId: currentSession.userId,
sha256: currentSession.expectedSha256,
},
},
});
let track =
currentSession.trackId != null
? await tx.track.findUnique({ where: { id: currentSession.trackId } })
: null;
if (!track && audioAsset?.trackId) {
track = await tx.track.findUnique({
where: { id: audioAsset.trackId },
});
}
const createdTrack = !track;
if (!track) {
track = await tx.track.create({
data: {
userId: currentSession.userId,
title,
artist,
album,
durationMs: body.durationMs,
status: 'ACTIVE',
},
});
}
if (audioAsset) {
const nextDurationMs = body.durationMs ?? audioAsset.durationMs;
const shouldUpdateAsset =
audioAsset.trackId !== track.id ||
audioAsset.originalFilename !== currentSession.originalFilename ||
audioAsset.mimeType !== 'audio/mpeg' ||
audioAsset.fileExtension !== 'mp3' ||
audioAsset.fileSizeBytes !== currentSession.expectedSizeBytes ||
audioAsset.sourceDeviceId !== currentSession.deviceId ||
audioAsset.durationMs !== nextDurationMs;
if (shouldUpdateAsset) {
audioAsset = await tx.audioAsset.update({
where: { id: audioAsset.id },
data: {
trackId: track.id,
originalFilename: currentSession.originalFilename,
mimeType: 'audio/mpeg',
fileExtension: 'mp3',
fileSizeBytes: currentSession.expectedSizeBytes,
sourceDeviceId: currentSession.deviceId,
durationMs: nextDurationMs,
},
});
}
} else {
audioAsset = await tx.audioAsset.create({
data: {
userId: currentSession.userId,
trackId: track.id,
sha256: currentSession.expectedSha256,
storageKey: finalStorageKey,
originalFilename: currentSession.originalFilename,
mimeType: 'audio/mpeg',
fileExtension: 'mp3',
fileSizeBytes: currentSession.expectedSizeBytes,
durationMs: body.durationMs,
sourceDeviceId: currentSession.deviceId,
},
});
}
if (track.primaryAudioAssetId !== audioAsset.id) {
track = await tx.track.update({
where: { id: track.id },
data: {
primaryAudioAssetId: audioAsset.id,
},
});
}
await tx.libraryEvent.create({
data: {
userId: currentSession.userId,
entityType: EntityType.TRACK,
entityId: track.id,
action: createdTrack ? EventAction.CREATED : EventAction.UPDATED,
},
});
await tx.uploadSession.update({
where: { id: currentSession.id },
data: {
trackId: track.id,
audioAssetId: audioAsset.id,
finalizedAt: new Date(),
status: UploadSessionStatus.COMPLETED,
},
});
return {
trackId: track.id,
assetId: audioAsset.id,
};
});
}
private async getUploadSessionOrThrow(uploadId: string): Promise<UploadSession> {
const uploadSession = await this.prismaService.uploadSession.findUnique({
where: { id: uploadId },
});
@ -69,12 +396,149 @@ export class UploadsService {
throw new NotFoundException('Upload session not found');
}
return uploadSession;
}
private toStatusResponse(
uploadSession: Pick<
UploadSession,
'id' | 'status' | 'receivedBytes' | 'expectedSizeBytes' | 'finalizedAt'
>,
): UploadSessionStatusResponseDto {
return {
uploadId: uploadSession.id,
status: uploadSession.status,
receivedBytes: uploadSession.receivedBytes.toString(),
expectedSizeBytes: uploadSession.expectedSizeBytes.toString(),
nextOffset: uploadSession.receivedBytes.toString(),
finalizedAt: uploadSession.finalizedAt?.toISOString(),
};
}
private assertFileSizeWithinLimit(sizeBytes: number): void {
if (sizeBytes > this.configService.maxUploadSizeBytes) {
throw new UnprocessableEntityException(
'Upload exceeds the configured maximum size.',
);
}
}
private assertMp3Filename(filename: string): void {
if (extname(filename).toLowerCase() !== '.mp3') {
throw new UnprocessableEntityException('Only MP3 uploads are supported.');
}
}
private assertUploadSessionReadyForBinaryTransfer(
uploadSession: Pick<UploadSession, 'status' | 'expiresAt' | 'originalFilename'>,
): void {
if (
uploadSession.status !== UploadSessionStatus.READY_TO_UPLOAD &&
uploadSession.status !== UploadSessionStatus.FAILED
) {
throw new UnprocessableEntityException(
'Upload session is not ready to receive file bytes.',
);
}
if (uploadSession.expiresAt.getTime() <= Date.now()) {
throw new UnprocessableEntityException('Upload session has expired.');
}
this.assertMp3Filename(uploadSession.originalFilename);
}
private assertAllowedMimeType(contentTypeHeader?: string | string[]): void {
const contentType = Array.isArray(contentTypeHeader)
? contentTypeHeader[0]
: contentTypeHeader;
const normalizedContentType = contentType
?.split(';', 1)[0]
?.trim()
.toLowerCase();
const allowedMimeTypes = new Set([
'audio/mpeg',
'audio/mp3',
'application/octet-stream',
]);
if (
normalizedContentType != null &&
normalizedContentType.length > 0 &&
!allowedMimeTypes.has(normalizedContentType)
) {
throw new UnprocessableEntityException(
'Only MP3 audio uploads are accepted.',
);
}
}
private parseContentLength(
contentLengthHeader?: string | string[],
): number | undefined {
const rawValue = Array.isArray(contentLengthHeader)
? contentLengthHeader[0]
: contentLengthHeader;
if (!rawValue) {
return undefined;
}
const parsedValue = Number(rawValue);
if (!Number.isFinite(parsedValue) || parsedValue < 0) {
throw new UnprocessableEntityException('Invalid Content-Length header.');
}
return parsedValue;
}
private looksLikeMp3(sniffedBytes: Buffer): boolean {
if (
sniffedBytes.length >= 3 &&
sniffedBytes.subarray(0, 3).equals(Buffer.from('ID3'))
) {
return true;
}
return (
sniffedBytes.length >= 2 &&
sniffedBytes[0] === 0xff &&
(sniffedBytes[1] & 0xe0) === 0xe0
);
}
private async markUploadFailed(
uploadId: string,
receivedBytes: number,
): Promise<void> {
await this.prismaService.uploadSession.update({
where: { id: uploadId },
data: {
status: UploadSessionStatus.FAILED,
receivedBytes: BigInt(receivedBytes),
},
});
}
private async fileExists(path: string): Promise<boolean> {
try {
await access(path, constants.F_OK);
return true;
} catch {
return false;
}
}
private async safeUnlink(path: string): Promise<void> {
try {
await unlink(path);
} catch {
// Ignore missing temp files during retries and cleanup.
}
}
private trimOptional(value?: string): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
}

View File

@ -0,0 +1,28 @@
import { Injectable } from '@nestjs/common';
import { User } from '@prisma/client';
import { PrismaService } from '../../infrastructure/database/prisma.service';
@Injectable()
export class DefaultUserService {
static readonly defaultOwnerSlug = 'default-owner';
static readonly defaultOwnerDisplayName = 'Default Owner';
constructor(private readonly prismaService: PrismaService) {}
async getOrCreateDefaultUser(): Promise<User> {
return this.prismaService.user.upsert({
where: {
slug: DefaultUserService.defaultOwnerSlug,
},
update: {
displayName: DefaultUserService.defaultOwnerDisplayName,
isDefault: true,
},
create: {
slug: DefaultUserService.defaultOwnerSlug,
displayName: DefaultUserService.defaultOwnerDisplayName,
isDefault: true,
},
});
}
}

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { PrismaModule } from '../../infrastructure/database/prisma.module';
import { DefaultUserService } from './default-user.service';
@Module({
imports: [PrismaModule],
providers: [DefaultUserService],
exports: [DefaultUserService],
})
export class UsersModule {}

View File

@ -1,4 +1,8 @@
import { randomUUID } from 'node:crypto';
import { randomUUID, createHash } from 'node:crypto';
import { mkdtemp, readFile, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { Readable } from 'node:stream';
import { INestApplication, ValidationPipe, VersioningType } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { AppModule } from '../../src/app.module';
@ -6,19 +10,65 @@ import { AppConfigService } from '../../src/modules/config/config.service';
import { DevicesController } from '../../src/modules/devices/devices.controller';
import { HealthController } from '../../src/modules/health/health.controller';
import { SyncController } from '../../src/modules/sync/sync.controller';
import { LocalFilesystemStorageService } from '../../src/modules/storage/storage.service';
import { UploadsController } from '../../src/modules/uploads/uploads.controller';
import { UploadsService } from '../../src/modules/uploads/uploads.service';
import { PrismaService } from '../../src/infrastructure/database/prisma.service';
function createPrismaMock() {
const devices = new Map<string, any>();
function sampleMp3Bytes(seed: string): Buffer {
return Buffer.concat([
Buffer.from('ID3', 'ascii'),
Buffer.from([0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x21]),
Buffer.from(seed, 'utf8'),
]);
}
return {
function sha256Hex(data: Buffer): string {
return createHash('sha256').update(data).digest('hex');
}
function createUploadRequest(data: Buffer): any {
const request = Readable.from([data]) as any;
request.headers = {
'content-type': 'audio/mpeg',
'content-length': String(data.length),
};
return request;
}
function createPrismaMock() {
const users = new Map<string, any>();
const devices = new Map<string, any>();
const tracks = new Map<string, any>();
const audioAssets = new Map<string, any>();
const uploadSessions = new Map<string, any>();
const libraryEvents = new Map<bigint, any>();
let nextLibraryEventId = 1n;
const defaultUser = {
id: randomUUID(),
slug: 'default-owner',
displayName: 'Default Owner',
isDefault: true,
createdAt: new Date(),
updatedAt: new Date(),
};
users.set(defaultUser.id, defaultUser);
const prismaMock: any = {
$queryRawUnsafe: jest.fn().mockResolvedValue([{ '?column?': 1 }]),
$transaction: jest.fn().mockImplementation(async (callback: any) => callback(prismaMock)),
user: {
upsert: jest.fn().mockResolvedValue(defaultUser),
},
device: {
create: jest.fn().mockImplementation(async ({ data }) => {
const id = randomUUID();
const record = { id, ...data };
devices.set(id, record);
const record = {
id: randomUUID(),
createdAt: new Date(),
updatedAt: new Date(),
...data,
};
devices.set(record.id, record);
return record;
}),
findUnique: jest.fn().mockImplementation(async ({ where }) => {
@ -26,25 +76,158 @@ function createPrismaMock() {
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = devices.get(where.id);
const updated = { ...current, ...data };
const updated = {
...current,
...data,
updatedAt: new Date(),
};
devices.set(where.id, updated);
return updated;
}),
},
track: {
findMany: jest.fn().mockImplementation(async ({ where }) => {
return [...tracks.values()]
.filter((track) => {
const userMatches = where?.userId ? track.userId === where.userId : true;
const statusMatches = where?.status ? track.status === where.status : true;
return userMatches && statusMatches;
})
.sort((lhs, rhs) => lhs.createdAt.getTime() - rhs.createdAt.getTime());
}),
findUnique: jest.fn().mockImplementation(async ({ where }) => {
return tracks.get(where.id) ?? null;
}),
create: jest.fn().mockImplementation(async ({ data }) => {
const now = new Date();
const record = {
id: randomUUID(),
primaryAudioAssetId: null,
artworkAssetId: null,
albumArtist: null,
genre: null,
discNumber: null,
trackNumber: null,
year: null,
deletedAt: null,
createdAt: now,
updatedAt: now,
...data,
};
tracks.set(record.id, record);
return record;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = tracks.get(where.id);
const updated = {
...current,
...data,
updatedAt: new Date(),
};
tracks.set(where.id, updated);
return updated;
}),
},
audioAsset: {
findUnique: jest.fn().mockResolvedValue(null),
findUnique: jest.fn().mockImplementation(async ({ where }) => {
if (where.id) {
return audioAssets.get(where.id) ?? null;
}
const composite = where.userId_sha256;
if (!composite) {
return null;
}
return (
[...audioAssets.values()].find(
(asset) =>
asset.userId === composite.userId &&
asset.sha256 === composite.sha256,
) ?? null
);
}),
create: jest.fn().mockImplementation(async ({ data }) => {
const record = {
id: randomUUID(),
bitRateKbps: null,
sampleRateHz: null,
channels: null,
createdAt: new Date(),
...data,
};
audioAssets.set(record.id, record);
return record;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = audioAssets.get(where.id);
const updated = {
...current,
...data,
};
audioAssets.set(where.id, updated);
return updated;
}),
},
uploadSession: {
create: jest.fn().mockImplementation(async ({ data }) => {
return {
id: randomUUID(),
const now = new Date();
const record = {
createdAt: now,
updatedAt: now,
completedAt: null,
finalizedAt: null,
trackId: null,
audioAssetId: null,
...data,
};
uploadSessions.set(record.id, record);
return record;
}),
findUnique: jest.fn().mockImplementation(async ({ where }) => {
return uploadSessions.get(where.id) ?? null;
}),
update: jest.fn().mockImplementation(async ({ where, data }) => {
const current = uploadSessions.get(where.id);
const updated = {
...current,
...data,
updatedAt: new Date(),
};
uploadSessions.set(where.id, updated);
return updated;
}),
findUnique: jest.fn().mockResolvedValue(null),
},
libraryEvent: {
findFirst: jest.fn().mockResolvedValue(null),
create: jest.fn().mockImplementation(async ({ data }) => {
const record = {
id: nextLibraryEventId,
payloadVersion: 1,
createdAt: new Date(),
...data,
};
libraryEvents.set(record.id, record);
nextLibraryEventId += 1n;
return record;
}),
findFirst: jest.fn().mockImplementation(async ({ where }) => {
const filteredEvents = [...libraryEvents.values()].filter((event) =>
where?.userId ? event.userId === where.userId : true,
);
return filteredEvents.sort((lhs, rhs) => Number(rhs.id - lhs.id))[0] ?? null;
}),
},
};
return {
prismaMock,
state: {
defaultUser,
devices,
tracks,
audioAssets,
uploadSessions,
libraryEvents,
},
};
}
@ -54,9 +237,16 @@ describe('Velody API wiring (e2e)', () => {
let healthController: HealthController;
let devicesController: DevicesController;
let syncController: SyncController;
let uploadsController: UploadsController;
let uploadsService: UploadsService;
let prismaState: ReturnType<typeof createPrismaMock>['state'];
let storageRoot: string;
beforeEach(async () => {
const prismaMock = createPrismaMock();
const { prismaMock, state } = createPrismaMock();
prismaState = state;
storageRoot = await mkdtemp(join(tmpdir(), 'velody-e2e-'));
const moduleRef = await Test.createTestingModule({
imports: [AppModule],
})
@ -64,15 +254,7 @@ describe('Velody API wiring (e2e)', () => {
.useValue({
appVersion: '0.1.0',
maxUploadSizeBytes: 1024 * 1024 * 1024,
storageRoot: '/tmp/velody-storage',
})
.overrideProvider(LocalFilesystemStorageService)
.useValue({
root: '/tmp/velody-storage',
checkReadiness: jest.fn().mockResolvedValue({
root: '/tmp/velody-storage',
writable: true,
}),
storageRoot,
})
.overrideProvider(PrismaService)
.useValue(prismaMock)
@ -93,12 +275,16 @@ describe('Velody API wiring (e2e)', () => {
healthController = moduleRef.get(HealthController);
devicesController = moduleRef.get(DevicesController);
syncController = moduleRef.get(SyncController);
uploadsController = moduleRef.get(UploadsController);
uploadsService = moduleRef.get(UploadsService);
});
afterEach(async () => {
if (app) {
await app.close();
}
await rm(storageRoot, { recursive: true, force: true });
});
it('returns health information', async () => {
@ -109,33 +295,25 @@ describe('Velody API wiring (e2e)', () => {
expect(response.version).toBe('0.1.0');
});
it('registers a device', async () => {
const response = await devicesController.register({
it('registers a device and accepts heartbeat', async () => {
const registerResponse = await devicesController.register({
platform: 'MACOS',
deviceName: 'Diya MacBook Pro',
appVersion: '0.1.0',
});
expect(response.deviceId).toBeDefined();
expect(response.bootstrapToken).toBeDefined();
});
expect(registerResponse.deviceId).toBeDefined();
expect(registerResponse.bootstrapToken).toBeDefined();
it('accepts device heartbeat', async () => {
const registerResponse = await devicesController.register({
platform: 'IPHONE',
deviceName: 'Diya iPhone',
appVersion: '0.1.0',
});
const response = await devicesController.heartbeat({
const heartbeatResponse = await devicesController.heartbeat({
deviceId: registerResponse.deviceId,
appVersion: '0.1.1',
});
expect(response.ok).toBe(true);
expect(heartbeatResponse.ok).toBe(true);
});
it('returns empty sync bootstrap and changes payloads', async () => {
it('returns sync bootstrap and changes payloads', async () => {
const bootstrapResponse = await syncController.bootstrap();
const changesResponse = await syncController.changes({ after: '0' });
@ -143,4 +321,59 @@ describe('Velody API wiring (e2e)', () => {
expect(changesResponse.events).toEqual([]);
expect(changesResponse.nextCursor).toBe('0');
});
it('supports the MP3 upload pipeline through the Nest app wiring', async () => {
const registerResponse = await devicesController.register({
platform: 'MACOS',
deviceName: 'Upload Mac',
appVersion: '0.1.0',
});
const bytes = sampleMp3Bytes('e2e-upload');
const sha256 = sha256Hex(bytes);
const prepareResponse = await uploadsController.prepare({
deviceId: registerResponse.deviceId,
sha256,
originalFilename: 'e2e-upload.mp3',
sizeBytes: bytes.length,
});
expect(prepareResponse.status).toBe('upload_required');
const uploadResponse = await uploadsService.uploadFile(
prepareResponse.uploadId!,
createUploadRequest(bytes),
);
expect(uploadResponse.status).toBe('COMPLETED');
const finalizeResponse = await uploadsController.finalize(
prepareResponse.uploadId!,
{
title: 'Uploaded Track',
artist: 'Velody',
album: 'Milestone 6',
durationMs: 222000,
},
);
expect(finalizeResponse.trackId).toBeDefined();
expect(finalizeResponse.assetId).toBeDefined();
const storedBytes = await readFile(
join(storageRoot, 'users', prismaState.defaultUser.id, 'audio', `${sha256}.mp3`),
);
expect(storedBytes.equals(bytes)).toBe(true);
const duplicatePrepare = await uploadsController.prepare({
deviceId: registerResponse.deviceId,
sha256,
originalFilename: 'e2e-upload.mp3',
sizeBytes: bytes.length,
});
expect(duplicatePrepare.status).toBe('exists');
expect(prismaState.audioAssets.size).toBe(1);
expect(prismaState.libraryEvents.size).toBe(1);
});
});

View File

@ -5,6 +5,14 @@ public enum DevicePlatform: String, Codable, Sendable, CaseIterable {
case iphone = "IPHONE"
}
public enum LocalUploadStatus: String, Codable, Hashable, Sendable, CaseIterable {
case localOnly
case preparing
case uploading
case uploaded
case failed
}
public struct LibraryTrack: Identifiable, Codable, Hashable, Sendable {
public let id: String
public var title: String
@ -13,6 +21,9 @@ public struct LibraryTrack: Identifiable, Codable, Hashable, Sendable {
public var durationSeconds: Double?
public var localFilePath: String
public var sha256: String?
public var uploadStatus: LocalUploadStatus?
public var remoteTrackId: String?
public var lastUploadError: String?
public init(
id: String = UUID().uuidString,
@ -21,7 +32,10 @@ public struct LibraryTrack: Identifiable, Codable, Hashable, Sendable {
album: String? = nil,
durationSeconds: Double? = nil,
localFilePath: String = "",
sha256: String? = nil
sha256: String? = nil,
uploadStatus: LocalUploadStatus? = nil,
remoteTrackId: String? = nil,
lastUploadError: String? = nil
) {
self.id = id
self.title = title
@ -30,6 +44,9 @@ public struct LibraryTrack: Identifiable, Codable, Hashable, Sendable {
self.durationSeconds = durationSeconds
self.localFilePath = localFilePath
self.sha256 = sha256
self.uploadStatus = uploadStatus
self.remoteTrackId = remoteTrackId
self.lastUploadError = lastUploadError
}
}
@ -91,6 +108,116 @@ public struct DeviceHeartbeatResponse: Codable, Hashable, Sendable {
}
}
public enum UploadPrepareStatus: String, Codable, Hashable, Sendable {
case exists
case uploadRequired = "upload_required"
}
public struct UploadPrepareRequest: Codable, Hashable, Sendable {
public var deviceId: String
public var sha256: String
public var originalFilename: String
public var sizeBytes: Int
public init(
deviceId: String,
sha256: String,
originalFilename: String,
sizeBytes: Int
) {
self.deviceId = deviceId
self.sha256 = sha256
self.originalFilename = originalFilename
self.sizeBytes = sizeBytes
}
}
public struct UploadPrepareResponse: Codable, Hashable, Sendable {
public var status: UploadPrepareStatus
public var uploadId: String?
public var nextOffset: Int?
public var trackId: String?
public var assetId: String?
public init(
status: UploadPrepareStatus,
uploadId: String? = nil,
nextOffset: Int? = nil,
trackId: String? = nil,
assetId: String? = nil
) {
self.status = status
self.uploadId = uploadId
self.nextOffset = nextOffset
self.trackId = trackId
self.assetId = assetId
}
}
public enum UploadSessionState: String, Codable, Hashable, Sendable {
case pending = "PENDING"
case readyToUpload = "READY_TO_UPLOAD"
case completed = "COMPLETED"
case failed = "FAILED"
}
public struct UploadSessionStatusResponse: Codable, Hashable, Sendable {
public var uploadId: String
public var status: UploadSessionState
public var receivedBytes: String
public var expectedSizeBytes: String
public var nextOffset: String
public var finalizedAt: String?
public init(
uploadId: String,
status: UploadSessionState,
receivedBytes: String,
expectedSizeBytes: String,
nextOffset: String,
finalizedAt: String? = nil
) {
self.uploadId = uploadId
self.status = status
self.receivedBytes = receivedBytes
self.expectedSizeBytes = expectedSizeBytes
self.nextOffset = nextOffset
self.finalizedAt = finalizedAt
}
}
public struct UploadFinalizeRequest: Codable, Hashable, Sendable {
public var title: String
public var artist: String
public var album: String?
public var durationMs: Int?
public init(
title: String,
artist: String,
album: String? = nil,
durationMs: Int? = nil
) {
self.title = title
self.artist = artist
self.album = album
self.durationMs = durationMs
}
}
public struct UploadFinalizeResponse: Codable, Hashable, Sendable {
public var trackId: String
public var assetId: String
public init(
trackId: String,
assetId: String
) {
self.trackId = trackId
self.assetId = assetId
}
}
public struct SyncCursor: Codable, Hashable, Sendable {
public var value: String

View File

@ -38,6 +38,25 @@ public protocol VelodyAPIClient: Sendable {
) async throws -> DeviceHeartbeatResponse
func fetchSyncBootstrap() async throws -> SyncBootstrapResponse
func prepareUpload(
_ payload: UploadPrepareRequest
) async throws -> UploadPrepareResponse
func fetchUploadStatus(
uploadId: String
) async throws -> UploadSessionStatusResponse
func uploadFile(
uploadId: String,
fileURL: URL,
mimeType: String
) async throws -> UploadSessionStatusResponse
func finalizeUpload(
uploadId: String,
payload: UploadFinalizeRequest
) async throws -> UploadFinalizeResponse
}
public struct URLSessionVelodyAPIClient: VelodyAPIClient {
@ -87,6 +106,71 @@ public struct URLSessionVelodyAPIClient: VelodyAPIClient {
)
}
public func prepareUpload(
_ payload: UploadPrepareRequest
) async throws -> UploadPrepareResponse {
try await sendRequest(
method: "POST",
pathComponents: ["api", "v1", "uploads", "prepare"],
body: payload,
responseType: UploadPrepareResponse.self
)
}
public func fetchUploadStatus(
uploadId: String
) async throws -> UploadSessionStatusResponse {
try await sendRequest(
method: "GET",
pathComponents: ["api", "v1", "uploads", uploadId],
responseType: UploadSessionStatusResponse.self
)
}
public func uploadFile(
uploadId: String,
fileURL: URL,
mimeType: String = "audio/mpeg"
) async throws -> UploadSessionStatusResponse {
guard FileManager.default.fileExists(atPath: fileURL.path) else {
throw VelodyAPIError.requestFailed("The selected file could not be found.")
}
let request = try buildRequest(
method: "PUT",
pathComponents: ["api", "v1", "uploads", uploadId, "file"],
bodyData: nil,
contentType: mimeType
)
let data: Data
let response: URLResponse
do {
(data, response) = try await session.upload(for: request, fromFile: fileURL)
} catch {
throw VelodyAPIError.requestFailed(error.localizedDescription)
}
return try decodeResponse(
data: data,
response: response,
responseType: UploadSessionStatusResponse.self
)
}
public func finalizeUpload(
uploadId: String,
payload: UploadFinalizeRequest
) async throws -> UploadFinalizeResponse {
try await sendRequest(
method: "POST",
pathComponents: ["api", "v1", "uploads", uploadId, "finalize"],
body: payload,
responseType: UploadFinalizeResponse.self
)
}
private func sendRequest<Response: Decodable>(
method: String,
pathComponents: [String],
@ -118,7 +202,8 @@ public struct URLSessionVelodyAPIClient: VelodyAPIClient {
let request = try buildRequest(
method: method,
pathComponents: pathComponents,
bodyData: bodyData
bodyData: bodyData,
contentType: "application/json"
)
return try await execute(request, responseType: responseType)
@ -127,7 +212,8 @@ public struct URLSessionVelodyAPIClient: VelodyAPIClient {
private func buildRequest(
method: String,
pathComponents: [String],
bodyData: Data?
bodyData: Data?,
contentType: String? = nil
) throws -> URLRequest {
guard let url = endpointURL(pathComponents: pathComponents) else {
throw VelodyAPIError.invalidServerURL(environment.baseURL.absoluteString)
@ -139,7 +225,10 @@ public struct URLSessionVelodyAPIClient: VelodyAPIClient {
if let bodyData {
request.httpBody = bodyData
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
}
if let contentType {
request.setValue(contentType, forHTTPHeaderField: "Content-Type")
}
return request
@ -158,6 +247,28 @@ public struct URLSessionVelodyAPIClient: VelodyAPIClient {
throw VelodyAPIError.requestFailed(error.localizedDescription)
}
return try decodeResponse(
data: data,
response: response,
responseType: responseType
)
}
private func decodeResponse<Response: Decodable>(
data: Data,
response: URLResponse,
responseType: Response.Type
) throws -> Response {
try validate(response: response, data: data)
do {
return try decoder.decode(responseType, from: data)
} catch {
throw VelodyAPIError.decodingFailed(error.localizedDescription)
}
}
private func validate(response: URLResponse, data: Data) throws {
guard let httpResponse = response as? HTTPURLResponse else {
throw VelodyAPIError.invalidResponse
}
@ -171,12 +282,6 @@ public struct URLSessionVelodyAPIClient: VelodyAPIClient {
message: message?.isEmpty == true ? nil : message
)
}
do {
return try decoder.decode(responseType, from: data)
} catch {
throw VelodyAPIError.decodingFailed(error.localizedDescription)
}
}
private func endpointURL(pathComponents: [String]) -> URL? {
@ -234,4 +339,58 @@ public struct StubVelodyAPIClient: VelodyAPIClient {
serverTime: ISO8601DateFormatter().string(from: .now)
)
}
public func prepareUpload(
_ payload: UploadPrepareRequest
) async throws -> UploadPrepareResponse {
_ = payload
return UploadPrepareResponse(
status: .uploadRequired,
uploadId: UUID().uuidString,
nextOffset: 0
)
}
public func fetchUploadStatus(
uploadId: String
) async throws -> UploadSessionStatusResponse {
UploadSessionStatusResponse(
uploadId: uploadId,
status: .completed,
receivedBytes: "0",
expectedSizeBytes: "0",
nextOffset: "0"
)
}
public func uploadFile(
uploadId: String,
fileURL: URL,
mimeType: String
) async throws -> UploadSessionStatusResponse {
_ = fileURL
_ = mimeType
return UploadSessionStatusResponse(
uploadId: uploadId,
status: .completed,
receivedBytes: "0",
expectedSizeBytes: "0",
nextOffset: "0"
)
}
public func finalizeUpload(
uploadId: String,
payload: UploadFinalizeRequest
) async throws -> UploadFinalizeResponse {
_ = uploadId
_ = payload
return UploadFinalizeResponse(
trackId: UUID().uuidString,
assetId: UUID().uuidString
)
}
}

View File

@ -15,6 +15,9 @@ public struct LocalTrack: Identifiable, Codable, Hashable, Sendable {
public var durationSeconds: Double?
public var localFilePath: String
public var sha256: String?
public var uploadStatus: LocalUploadStatus
public var remoteTrackId: String?
public var lastUploadError: String?
public var fileModifiedAt: Date?
public var lastScannedAt: Date?
public var isDeleted: Bool
@ -31,6 +34,9 @@ public struct LocalTrack: Identifiable, Codable, Hashable, Sendable {
durationSeconds: Double? = nil,
localFilePath: String = "",
sha256: String? = nil,
uploadStatus: LocalUploadStatus = .localOnly,
remoteTrackId: String? = nil,
lastUploadError: String? = nil,
fileModifiedAt: Date? = nil,
lastScannedAt: Date? = nil,
isDeleted: Bool = false,
@ -46,6 +52,9 @@ public struct LocalTrack: Identifiable, Codable, Hashable, Sendable {
self.durationSeconds = durationSeconds
self.localFilePath = localFilePath
self.sha256 = sha256
self.uploadStatus = uploadStatus
self.remoteTrackId = remoteTrackId
self.lastUploadError = lastUploadError
self.fileModifiedAt = fileModifiedAt
self.lastScannedAt = lastScannedAt
self.isDeleted = isDeleted
@ -68,6 +77,9 @@ public struct LocalTrack: Identifiable, Codable, Hashable, Sendable {
durationSeconds: scannedTrack.durationSeconds,
localFilePath: scannedTrack.localFilePath,
sha256: scannedTrack.sha256,
uploadStatus: .localOnly,
remoteTrackId: nil,
lastUploadError: nil,
fileModifiedAt: scannedTrack.fileModifiedAt,
lastScannedAt: observedAt,
isDeleted: false,
@ -91,6 +103,9 @@ public struct LocalTrack: Identifiable, Codable, Hashable, Sendable {
durationSeconds: libraryTrack.durationSeconds,
localFilePath: libraryTrack.localFilePath,
sha256: libraryTrack.sha256,
uploadStatus: libraryTrack.uploadStatus ?? .localOnly,
remoteTrackId: libraryTrack.remoteTrackId,
lastUploadError: libraryTrack.lastUploadError,
fileModifiedAt: nil,
lastScannedAt: origin == .localScan ? observedAt : nil,
isDeleted: false,
@ -116,7 +131,10 @@ public struct LocalTrack: Identifiable, Codable, Hashable, Sendable {
album: album,
durationSeconds: durationSeconds,
localFilePath: localFilePath,
sha256: sha256
sha256: sha256,
uploadStatus: uploadStatus,
remoteTrackId: remoteTrackId,
lastUploadError: lastUploadError
)
}

View File

@ -1,5 +1,6 @@
import Foundation
import SwiftData
import VelodyDomain
@Model
final class TrackEntity {
@ -12,6 +13,9 @@ final class TrackEntity {
var durationSeconds: Double?
var localFilePath: String
var sha256: String?
var uploadStatusRawValue: String?
var remoteTrackID: String?
var lastUploadError: String?
var fileModifiedAt: Date?
var lastScannedAt: Date?
var isMarkedDeleted: Bool
@ -29,6 +33,9 @@ final class TrackEntity {
durationSeconds = track.durationSeconds
localFilePath = track.localFilePath
sha256 = track.sha256
uploadStatusRawValue = track.uploadStatus.rawValue
remoteTrackID = track.remoteTrackId
lastUploadError = track.lastUploadError
fileModifiedAt = track.fileModifiedAt
lastScannedAt = track.lastScannedAt
isMarkedDeleted = track.isDeleted
@ -47,6 +54,9 @@ final class TrackEntity {
durationSeconds: durationSeconds,
localFilePath: localFilePath,
sha256: sha256,
uploadStatus: LocalUploadStatus(rawValue: uploadStatusRawValue ?? "") ?? .localOnly,
remoteTrackId: remoteTrackID,
lastUploadError: lastUploadError,
fileModifiedAt: fileModifiedAt,
lastScannedAt: lastScannedAt,
isDeleted: isMarkedDeleted,
@ -66,6 +76,9 @@ final class TrackEntity {
durationSeconds = track.durationSeconds
localFilePath = track.localFilePath
sha256 = track.sha256
uploadStatusRawValue = track.uploadStatus.rawValue
remoteTrackID = track.remoteTrackId
lastUploadError = track.lastUploadError
fileModifiedAt = track.fileModifiedAt
lastScannedAt = track.lastScannedAt
isMarkedDeleted = track.isDeleted