From c3ddcf16bf7bca74f597b045a32c784de9a47be5 Mon Sep 17 00:00:00 2001 From: Amatsugu Date: Wed, 28 Jan 2026 14:19:32 -0500 Subject: [PATCH] improve sync batching; added playback types --- AZKiServer/Protos/azki.proto | 1 + AZKiServer/Protos/types.proto | 4 + AZKiServer/Services/AZKiRpcService.cs | 25 ++++++ AZKiServer/Services/FileScannerService.cs | 99 +++++++++++++---------- client/src/components/playback/player.rs | 18 +---- 5 files changed, 91 insertions(+), 56 deletions(-) diff --git a/AZKiServer/Protos/azki.proto b/AZKiServer/Protos/azki.proto index 6673434..7cfe333 100644 --- a/AZKiServer/Protos/azki.proto +++ b/AZKiServer/Protos/azki.proto @@ -9,4 +9,5 @@ import "Protos/types.proto"; service AZKi{ rpc GetMediaEntriesInRange(MediaRangeRequest) returns (MediaList); + rpc GetMediaPlayback(MediaPlaybackRequest) returns (PlaybackInfo); } \ No newline at end of file diff --git a/AZKiServer/Protos/types.proto b/AZKiServer/Protos/types.proto index 684e97a..3cc6372 100644 --- a/AZKiServer/Protos/types.proto +++ b/AZKiServer/Protos/types.proto @@ -23,6 +23,10 @@ message MediaRangeRequest{ google.protobuf.Timestamp to = 3; } +message MediaPlaybackRequest{ + google.protobuf.Timestamp date = 1; +} + message MediaEntry { int32 version = 1; string id = 2; diff --git a/AZKiServer/Services/AZKiRpcService.cs b/AZKiServer/Services/AZKiRpcService.cs index e0e44b7..69cfb96 100644 --- a/AZKiServer/Services/AZKiRpcService.cs +++ b/AZKiServer/Services/AZKiRpcService.cs @@ -18,4 +18,29 @@ public class AZKiRpcService(MediaService mediaService) : RPC.AZKi.AZKiBase result.Entries.AddRange(items.Select(e => e.ToRpc())); return result; } + + public override async Task GetMediaPlayback(MediaPlaybackRequest request, ServerCallContext context) + { + var from = request.Date.ToDateTime().Date; + var to = request.Date.ToDateTime().Date.AddDays(1); + var items = await mediaService.GetEntriesInRangeAsync(Models.MediaType.All, from, to); + var channels = items.GroupBy(i => i.CameraId).Select(c => + { + var images = c.Where(m => m.Type == Models.MediaType.Image).Select(m => m.ToRpc()); + var videos = c.Where(m => m.Type == Models.MediaType.Video).Select(m => m.ToRpc()); + var result = new MediaChannel + { + CameraId = c.Key, + }; + result.Images.AddRange(images); + result.Videos.AddRange(videos); + return result; + }); + var playback = new PlaybackInfo + { + Date = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(from), + }; + playback.Channels.AddRange(channels); + return playback; + } } diff --git a/AZKiServer/Services/FileScannerService.cs b/AZKiServer/Services/FileScannerService.cs index ccc989c..047d71a 100644 --- a/AZKiServer/Services/FileScannerService.cs +++ b/AZKiServer/Services/FileScannerService.cs @@ -8,6 +8,9 @@ using MaybeError.Errors; using SixLabors.ImageSharp; +using System.Collections.Frozen; +using System.IO; + namespace AZKiServer.Services; public class FileScannerService(MediaService mediaService, IConfiguration config, ILogger logger) : IHostedService, IDisposable @@ -50,49 +53,11 @@ public class FileScannerService(MediaService mediaService, IConfiguration config { var files = Directory.GetFiles(path, "*", SearchOption.AllDirectories); var existingFiles = await mediaService.GetExistingFilePathsAsync(cancellationToken); - var entries = new List(); - var upgradeEntries = new List(); - foreach (var filePath in files) + var total = 0; + foreach (var chunk in files.Chunk(50)) { - if (cancellationToken.IsCancellationRequested) - break; - var relativePath = Path.GetRelativePath(path, filePath); - if (relativePath[0] == '.') //Ignore hidden folders - continue; - var isUpgrade = false; - if (existingFiles.TryGetValue(relativePath, out var version)) - { - if(version < MediaEntry.CUR_VERSION) - continue; - isUpgrade = true; - } - var metadata = ReadMetadata(filePath); - if(metadata.HasError) - { - logger.LogError(metadata.Error.GetException(), $"Failed to get metadata for file: {filePath}"); - continue; - } - var entry = MediaEntry.Parse(relativePath, metadata); - if(entry.HasError) - { - logger.LogError(entry.Error.GetException(), "Failed to parse file data"); - continue; - } - if(isUpgrade) - upgradeEntries.Add(entry); - else - entries.Add(entry); - } - cancellationToken.ThrowIfCancellationRequested(); - if(entries.Count > 0) { - await mediaService.AddMediaBulkAsync(entries, cancellationToken); - logger.LogInformation("Added {count} file entries", entries.Count); - } - if (upgradeEntries.Count > 0) - { - await mediaService.DeleteAllEntriesAsync(upgradeEntries.Select(e => e.Filepath), cancellationToken); - await mediaService.AddMediaBulkAsync(upgradeEntries, cancellationToken); - logger.LogInformation("Upgraded {count} file entries", entries.Count); + total += await ScanFileChunkAsync(path, chunk, existingFiles, cancellationToken); + logger.LogInformation("Added {updated} of {count}", total, existingFiles.Count); } } catch (Exception ex) @@ -101,6 +66,56 @@ public class FileScannerService(MediaService mediaService, IConfiguration config } } + private async Task ScanFileChunkAsync(string path, IEnumerable files, FrozenDictionary existingFiles, CancellationToken cancellationToken = default) + { + var entries = new List(); + var upgradeEntries = new List(); + foreach (var filePath in files) + { + if (cancellationToken.IsCancellationRequested) + break; + var relativePath = Path.GetRelativePath(path, filePath); + if (relativePath[0] == '.') //Ignore hidden folders + continue; + var isUpgrade = false; + if (existingFiles.TryGetValue(relativePath, out var version)) + { + if (version < MediaEntry.CUR_VERSION) + continue; + isUpgrade = true; + } + var metadata = ReadMetadata(filePath); + if (metadata.HasError) + { + logger.LogError(metadata.Error.GetException(), $"Failed to get metadata for file: {filePath}"); + continue; + } + var entry = MediaEntry.Parse(relativePath, metadata); + if (entry.HasError) + { + logger.LogError(entry.Error.GetException(), "Failed to parse file data"); + continue; + } + if (isUpgrade) + upgradeEntries.Add(entry); + else + entries.Add(entry); + } + cancellationToken.ThrowIfCancellationRequested(); + if (entries.Count > 0) + { + await mediaService.AddMediaBulkAsync(entries, cancellationToken); + logger.LogInformation("Added {count} file entries", entries.Count); + } + if (upgradeEntries.Count > 0) + { + await mediaService.DeleteAllEntriesAsync(upgradeEntries.Select(e => e.Filepath), cancellationToken); + await mediaService.AddMediaBulkAsync(upgradeEntries, cancellationToken); + logger.LogInformation("Upgraded {count} file entries", entries.Count); + } + return entries.Count + upgradeEntries.Count; + } + private static Maybe ReadMetadata(string filePath) { var ext = Path.GetExtension(filePath); diff --git a/client/src/components/playback/player.rs b/client/src/components/playback/player.rs index 72e4242..e469792 100644 --- a/client/src/components/playback/player.rs +++ b/client/src/components/playback/player.rs @@ -4,10 +4,7 @@ use prost_types::Timestamp; use crate::{ components::playback::{Timeline, Viewport}, - rpc::{ - azki::{MediaRangeRequest, MediaType}, - get_rpc_client, - }, + rpc::{azki::MediaPlaybackRequest, get_rpc_client}, }; const PLAYER_CSS: Asset = asset!("/assets/styling/player.scss"); @@ -17,19 +14,12 @@ pub fn Player() -> Element { let mut client = get_rpc_client(); let now = Local::now(); let from = Timestamp::date(now.year() as i64, now.month() as u8, now.day() as u8).unwrap(); - let tomorrow = now.checked_add_days(Days::new(1)).unwrap(); - let to = Timestamp::date(tomorrow.year() as i64, tomorrow.month() as u8, tomorrow.day() as u8).unwrap(); let result = client - .get_media_entries_in_range(MediaRangeRequest { - r#type: MediaType::Image.into(), - from: Some(from), - to: Some(to), - ..Default::default() - }) + .get_media_playback(MediaPlaybackRequest { date: Some(from) }) .await; if let Ok(entries) = result { let res = entries.into_inner(); - return Ok(res.entries); + return Ok(res); } else { let err = result.err().unwrap(); let msg = err.message(); @@ -38,7 +28,7 @@ pub fn Player() -> Element { }); let len = match entries.cloned() { Some(value) => match value { - Ok(result) => result.len().to_string(), + Ok(result) => result.channels.len().to_string(), Err(err) => err, }, _ => "Not Loaded".to_string(),