improve sync batching; added playback types
This commit is contained in:
@@ -9,4 +9,5 @@ import "Protos/types.proto";
|
|||||||
|
|
||||||
service AZKi{
|
service AZKi{
|
||||||
rpc GetMediaEntriesInRange(MediaRangeRequest) returns (MediaList);
|
rpc GetMediaEntriesInRange(MediaRangeRequest) returns (MediaList);
|
||||||
|
rpc GetMediaPlayback(MediaPlaybackRequest) returns (PlaybackInfo);
|
||||||
}
|
}
|
||||||
@@ -23,6 +23,10 @@ message MediaRangeRequest{
|
|||||||
google.protobuf.Timestamp to = 3;
|
google.protobuf.Timestamp to = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message MediaPlaybackRequest{
|
||||||
|
google.protobuf.Timestamp date = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message MediaEntry {
|
message MediaEntry {
|
||||||
int32 version = 1;
|
int32 version = 1;
|
||||||
string id = 2;
|
string id = 2;
|
||||||
|
|||||||
@@ -18,4 +18,29 @@ public class AZKiRpcService(MediaService mediaService) : RPC.AZKi.AZKiBase
|
|||||||
result.Entries.AddRange(items.Select(e => e.ToRpc()));
|
result.Entries.AddRange(items.Select(e => e.ToRpc()));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public override async Task<PlaybackInfo> GetMediaPlayback(MediaPlaybackRequest request, ServerCallContext context)
|
||||||
|
{
|
||||||
|
var from = request.Date.ToDateTime().Date;
|
||||||
|
var to = request.Date.ToDateTime().Date.AddDays(1);
|
||||||
|
var items = await mediaService.GetEntriesInRangeAsync(Models.MediaType.All, from, to);
|
||||||
|
var channels = items.GroupBy(i => i.CameraId).Select(c =>
|
||||||
|
{
|
||||||
|
var images = c.Where(m => m.Type == Models.MediaType.Image).Select(m => m.ToRpc());
|
||||||
|
var videos = c.Where(m => m.Type == Models.MediaType.Video).Select(m => m.ToRpc());
|
||||||
|
var result = new MediaChannel
|
||||||
|
{
|
||||||
|
CameraId = c.Key,
|
||||||
|
};
|
||||||
|
result.Images.AddRange(images);
|
||||||
|
result.Videos.AddRange(videos);
|
||||||
|
return result;
|
||||||
|
});
|
||||||
|
var playback = new PlaybackInfo
|
||||||
|
{
|
||||||
|
Date = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(from),
|
||||||
|
};
|
||||||
|
playback.Channels.AddRange(channels);
|
||||||
|
return playback;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,9 @@ using MaybeError.Errors;
|
|||||||
|
|
||||||
using SixLabors.ImageSharp;
|
using SixLabors.ImageSharp;
|
||||||
|
|
||||||
|
using System.Collections.Frozen;
|
||||||
|
using System.IO;
|
||||||
|
|
||||||
namespace AZKiServer.Services;
|
namespace AZKiServer.Services;
|
||||||
|
|
||||||
public class FileScannerService(MediaService mediaService, IConfiguration config, ILogger<FileScannerService> logger) : IHostedService, IDisposable
|
public class FileScannerService(MediaService mediaService, IConfiguration config, ILogger<FileScannerService> logger) : IHostedService, IDisposable
|
||||||
@@ -50,49 +53,11 @@ public class FileScannerService(MediaService mediaService, IConfiguration config
|
|||||||
{
|
{
|
||||||
var files = Directory.GetFiles(path, "*", SearchOption.AllDirectories);
|
var files = Directory.GetFiles(path, "*", SearchOption.AllDirectories);
|
||||||
var existingFiles = await mediaService.GetExistingFilePathsAsync(cancellationToken);
|
var existingFiles = await mediaService.GetExistingFilePathsAsync(cancellationToken);
|
||||||
var entries = new List<MediaEntry>();
|
var total = 0;
|
||||||
var upgradeEntries = new List<MediaEntry>();
|
foreach (var chunk in files.Chunk(50))
|
||||||
foreach (var filePath in files)
|
|
||||||
{
|
{
|
||||||
if (cancellationToken.IsCancellationRequested)
|
total += await ScanFileChunkAsync(path, chunk, existingFiles, cancellationToken);
|
||||||
break;
|
logger.LogInformation("Added {updated} of {count}", total, existingFiles.Count);
|
||||||
var relativePath = Path.GetRelativePath(path, filePath);
|
|
||||||
if (relativePath[0] == '.') //Ignore hidden folders
|
|
||||||
continue;
|
|
||||||
var isUpgrade = false;
|
|
||||||
if (existingFiles.TryGetValue(relativePath, out var version))
|
|
||||||
{
|
|
||||||
if(version < MediaEntry.CUR_VERSION)
|
|
||||||
continue;
|
|
||||||
isUpgrade = true;
|
|
||||||
}
|
|
||||||
var metadata = ReadMetadata(filePath);
|
|
||||||
if(metadata.HasError)
|
|
||||||
{
|
|
||||||
logger.LogError(metadata.Error.GetException(), $"Failed to get metadata for file: {filePath}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
var entry = MediaEntry.Parse(relativePath, metadata);
|
|
||||||
if(entry.HasError)
|
|
||||||
{
|
|
||||||
logger.LogError(entry.Error.GetException(), "Failed to parse file data");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if(isUpgrade)
|
|
||||||
upgradeEntries.Add(entry);
|
|
||||||
else
|
|
||||||
entries.Add(entry);
|
|
||||||
}
|
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
|
||||||
if(entries.Count > 0) {
|
|
||||||
await mediaService.AddMediaBulkAsync(entries, cancellationToken);
|
|
||||||
logger.LogInformation("Added {count} file entries", entries.Count);
|
|
||||||
}
|
|
||||||
if (upgradeEntries.Count > 0)
|
|
||||||
{
|
|
||||||
await mediaService.DeleteAllEntriesAsync(upgradeEntries.Select(e => e.Filepath), cancellationToken);
|
|
||||||
await mediaService.AddMediaBulkAsync(upgradeEntries, cancellationToken);
|
|
||||||
logger.LogInformation("Upgraded {count} file entries", entries.Count);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
@@ -101,6 +66,56 @@ public class FileScannerService(MediaService mediaService, IConfiguration config
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async Task<int> ScanFileChunkAsync(string path, IEnumerable<string> files, FrozenDictionary<string, int> existingFiles, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var entries = new List<MediaEntry>();
|
||||||
|
var upgradeEntries = new List<MediaEntry>();
|
||||||
|
foreach (var filePath in files)
|
||||||
|
{
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
break;
|
||||||
|
var relativePath = Path.GetRelativePath(path, filePath);
|
||||||
|
if (relativePath[0] == '.') //Ignore hidden folders
|
||||||
|
continue;
|
||||||
|
var isUpgrade = false;
|
||||||
|
if (existingFiles.TryGetValue(relativePath, out var version))
|
||||||
|
{
|
||||||
|
if (version < MediaEntry.CUR_VERSION)
|
||||||
|
continue;
|
||||||
|
isUpgrade = true;
|
||||||
|
}
|
||||||
|
var metadata = ReadMetadata(filePath);
|
||||||
|
if (metadata.HasError)
|
||||||
|
{
|
||||||
|
logger.LogError(metadata.Error.GetException(), $"Failed to get metadata for file: {filePath}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
var entry = MediaEntry.Parse(relativePath, metadata);
|
||||||
|
if (entry.HasError)
|
||||||
|
{
|
||||||
|
logger.LogError(entry.Error.GetException(), "Failed to parse file data");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (isUpgrade)
|
||||||
|
upgradeEntries.Add(entry);
|
||||||
|
else
|
||||||
|
entries.Add(entry);
|
||||||
|
}
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
if (entries.Count > 0)
|
||||||
|
{
|
||||||
|
await mediaService.AddMediaBulkAsync(entries, cancellationToken);
|
||||||
|
logger.LogInformation("Added {count} file entries", entries.Count);
|
||||||
|
}
|
||||||
|
if (upgradeEntries.Count > 0)
|
||||||
|
{
|
||||||
|
await mediaService.DeleteAllEntriesAsync(upgradeEntries.Select(e => e.Filepath), cancellationToken);
|
||||||
|
await mediaService.AddMediaBulkAsync(upgradeEntries, cancellationToken);
|
||||||
|
logger.LogInformation("Upgraded {count} file entries", entries.Count);
|
||||||
|
}
|
||||||
|
return entries.Count + upgradeEntries.Count;
|
||||||
|
}
|
||||||
|
|
||||||
private static Maybe<MediaMetadata> ReadMetadata(string filePath)
|
private static Maybe<MediaMetadata> ReadMetadata(string filePath)
|
||||||
{
|
{
|
||||||
var ext = Path.GetExtension(filePath);
|
var ext = Path.GetExtension(filePath);
|
||||||
|
|||||||
@@ -4,10 +4,7 @@ use prost_types::Timestamp;
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
components::playback::{Timeline, Viewport},
|
components::playback::{Timeline, Viewport},
|
||||||
rpc::{
|
rpc::{azki::MediaPlaybackRequest, get_rpc_client},
|
||||||
azki::{MediaRangeRequest, MediaType},
|
|
||||||
get_rpc_client,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
const PLAYER_CSS: Asset = asset!("/assets/styling/player.scss");
|
const PLAYER_CSS: Asset = asset!("/assets/styling/player.scss");
|
||||||
|
|
||||||
@@ -17,19 +14,12 @@ pub fn Player() -> Element {
|
|||||||
let mut client = get_rpc_client();
|
let mut client = get_rpc_client();
|
||||||
let now = Local::now();
|
let now = Local::now();
|
||||||
let from = Timestamp::date(now.year() as i64, now.month() as u8, now.day() as u8).unwrap();
|
let from = Timestamp::date(now.year() as i64, now.month() as u8, now.day() as u8).unwrap();
|
||||||
let tomorrow = now.checked_add_days(Days::new(1)).unwrap();
|
|
||||||
let to = Timestamp::date(tomorrow.year() as i64, tomorrow.month() as u8, tomorrow.day() as u8).unwrap();
|
|
||||||
let result = client
|
let result = client
|
||||||
.get_media_entries_in_range(MediaRangeRequest {
|
.get_media_playback(MediaPlaybackRequest { date: Some(from) })
|
||||||
r#type: MediaType::Image.into(),
|
|
||||||
from: Some(from),
|
|
||||||
to: Some(to),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.await;
|
.await;
|
||||||
if let Ok(entries) = result {
|
if let Ok(entries) = result {
|
||||||
let res = entries.into_inner();
|
let res = entries.into_inner();
|
||||||
return Ok(res.entries);
|
return Ok(res);
|
||||||
} else {
|
} else {
|
||||||
let err = result.err().unwrap();
|
let err = result.err().unwrap();
|
||||||
let msg = err.message();
|
let msg = err.message();
|
||||||
@@ -38,7 +28,7 @@ pub fn Player() -> Element {
|
|||||||
});
|
});
|
||||||
let len = match entries.cloned() {
|
let len = match entries.cloned() {
|
||||||
Some(value) => match value {
|
Some(value) => match value {
|
||||||
Ok(result) => result.len().to_string(),
|
Ok(result) => result.channels.len().to_string(),
|
||||||
Err(err) => err,
|
Err(err) => err,
|
||||||
},
|
},
|
||||||
_ => "Not Loaded".to_string(),
|
_ => "Not Loaded".to_string(),
|
||||||
|
|||||||
Reference in New Issue
Block a user