perf(engine): share Tokio runtime across all network tasks (M-16)

Replace per-call new_current_thread() runtimes with a single
TokioRuntimeResource(Arc<Runtime>) built once at startup using
new_multi_thread(worker_threads(2)). The Arc is cloned cheaply into
each AsyncComputeTaskPool closure, eliminating repeated OS thread
allocation on every sync pull/push, auth, avatar fetch, and analytics
flush. Using a multi-threaded runtime ensures concurrent block_on calls
from different worker threads are safe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
funman300
2026-05-17 20:58:51 -07:00
parent 7fc98f8801
commit ffed6b27e9
5 changed files with 85 additions and 87 deletions
+10 -14
View File
@@ -12,7 +12,7 @@ use solitaire_core::game_state::GameMode;
use solitaire_data::{matomo_client::MatomoClient, settings::SyncBackend, Settings};
use crate::events::{AchievementUnlockedEvent, ForfeitEvent, GameWonEvent, NewGameRequestEvent};
use crate::resources::GameStateResource;
use crate::resources::{GameStateResource, TokioRuntimeResource};
use crate::settings_plugin::{SettingsChangedEvent, SettingsResource};
// ---------------------------------------------------------------------------
@@ -45,6 +45,7 @@ pub struct AnalyticsPlugin;
impl Plugin for AnalyticsPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<AnalyticsResource>()
.init_resource::<TokioRuntimeResource>()
.add_systems(Startup, init_analytics)
.add_systems(
Update,
@@ -80,28 +81,28 @@ fn react_to_settings_change(
fn on_game_won(
mut wins: MessageReader<GameWonEvent>,
analytics: Res<AnalyticsResource>,
settings: Res<SettingsResource>,
rt: Res<TokioRuntimeResource>,
) {
let Some(client) = analytics.client.clone() else {
return;
};
for ev in wins.read() {
client.event("Game", "Won", None, Some(ev.score as f64));
fire_flush(client.clone(), &settings.0);
fire_flush(client.clone(), rt.0.clone());
}
}
fn on_forfeit(
mut forfeits: MessageReader<ForfeitEvent>,
analytics: Res<AnalyticsResource>,
settings: Res<SettingsResource>,
rt: Res<TokioRuntimeResource>,
) {
let Some(client) = analytics.client.clone() else {
return;
};
for _ev in forfeits.read() {
client.event("Game", "Forfeit", None, None);
fire_flush(client.clone(), &settings.0);
fire_flush(client.clone(), rt.0.clone());
}
}
@@ -137,14 +138,14 @@ fn on_achievement_unlocked(
fn tick_flush_timer(
time: Res<Time>,
mut analytics: ResMut<AnalyticsResource>,
settings: Res<SettingsResource>,
rt: Res<TokioRuntimeResource>,
) {
analytics.flush_timer.tick(time.delta());
if !analytics.flush_timer.just_finished() {
return;
}
if let Some(client) = analytics.client.clone() {
fire_flush(client, &settings.0);
fire_flush(client, rt.0.clone());
}
}
@@ -164,15 +165,10 @@ fn client_for(settings: &Settings) -> Option<Arc<MatomoClient>> {
Some(Arc::new(MatomoClient::new(url, settings.matomo_site_id, uid)))
}
fn fire_flush(client: Arc<MatomoClient>, _settings: &Settings) {
fn fire_flush(client: Arc<MatomoClient>, rt: Arc<tokio::runtime::Runtime>) {
AsyncComputeTaskPool::get()
.spawn(async move {
if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
rt.block_on(client.flush());
}
rt.block_on(client.flush());
})
.detach();
}
+17 -16
View File
@@ -21,6 +21,8 @@ use bevy::asset::RenderAssetUsages;
use bevy::prelude::*;
use bevy::tasks::{futures_lite::future, AsyncComputeTaskPool, Task};
use crate::resources::TokioRuntimeResource;
/// Stores the loaded avatar [`Handle<Image>`], or `None` when no avatar
/// has been fetched yet (new account, no internet, or fetch in progress).
#[derive(Resource, Default)]
@@ -46,6 +48,7 @@ pub struct AvatarPlugin;
impl Plugin for AvatarPlugin {
fn build(&self, app: &mut App) {
app.add_message::<AvatarFetchEvent>()
.init_resource::<TokioRuntimeResource>()
.init_resource::<AvatarResource>()
.init_resource::<PendingAvatarTask>()
.add_systems(Update, (handle_avatar_fetch, poll_avatar_task));
@@ -54,28 +57,26 @@ impl Plugin for AvatarPlugin {
fn handle_avatar_fetch(
mut events: MessageReader<AvatarFetchEvent>,
rt: Res<TokioRuntimeResource>,
mut pending: ResMut<PendingAvatarTask>,
) {
for ev in events.read() {
// Cancel any in-flight task and restart with the new URL.
let url = ev.url.clone();
let rt = rt.0.clone();
pending.0 = Some(AsyncComputeTaskPool::get().spawn(async move {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.ok()?
.block_on(async move {
let client = reqwest::Client::new();
let bytes = client
.get(&url)
.send()
.await
.ok()?
.bytes()
.await
.ok()?;
Some(bytes.to_vec())
})
rt.block_on(async move {
let client = reqwest::Client::new();
let bytes = client
.get(&url)
.send()
.await
.ok()?
.bytes()
.await
.ok()?;
Some(bytes.to_vec())
})
}));
}
}
+25
View File
@@ -1,5 +1,7 @@
//! Bevy resources owned by the engine crate.
use std::sync::Arc;
use bevy::math::Vec2;
use bevy::prelude::Resource;
use chrono::{DateTime, Utc};
@@ -111,3 +113,26 @@ pub struct HintCycleIndex(pub usize);
/// returns to the same position in the list without re-scrolling.
#[derive(Resource, Debug, Clone, Default)]
pub struct SettingsScrollPos(pub f32);
/// Shared Tokio runtime used by all async-task closures that need HTTP I/O.
///
/// Bevy's `AsyncComputeTaskPool` uses `async-executor` (not Tokio), so spawned
/// closures that call `reqwest`/`hyper` need a Tokio reactor. A single
/// multi-threaded runtime is built once at startup and its `Arc` cloned cheaply
/// into every network task — safe for concurrent `block_on` calls from multiple
/// worker threads.
#[derive(Resource, Clone)]
pub struct TokioRuntimeResource(pub Arc<tokio::runtime::Runtime>);
impl Default for TokioRuntimeResource {
fn default() -> Self {
// Building the Tokio runtime is startup-time initialization; failure
// here means the OS refused to create threads, which is unrecoverable.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("failed to build shared Tokio runtime");
Self(Arc::new(rt))
}
}
+13 -34
View File
@@ -31,7 +31,7 @@ use crate::events::{
};
use crate::game_plugin::RecordingReplay;
use crate::progress_plugin::{ProgressResource, ProgressStoragePath};
use crate::resources::{GameStateResource, SyncStatus, SyncStatusResource};
use crate::resources::{GameStateResource, SyncStatus, SyncStatusResource, TokioRuntimeResource};
use crate::stats_plugin::{LatestReplayPath, ReplayHistoryResource, StatsResource, StatsStoragePath};
// ---------------------------------------------------------------------------
@@ -101,6 +101,7 @@ impl SyncPlugin {
impl Plugin for SyncPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(SyncProviderResource(self.provider.clone()))
.init_resource::<TokioRuntimeResource>()
.init_resource::<SyncStatusResource>()
.init_resource::<PullTaskResult>()
.init_resource::<PullTask>()
@@ -130,19 +131,14 @@ impl Plugin for SyncPlugin {
/// Startup system: spawns the async pull task and sets status to `Syncing`.
fn start_pull(
provider: Res<SyncProviderResource>,
rt: Res<TokioRuntimeResource>,
mut task_res: ResMut<PullTask>,
mut status: ResMut<SyncStatusResource>,
) {
let provider = provider.0.clone();
let rt = rt.0.clone();
let task = AsyncComputeTaskPool::get().spawn(async move {
// Bevy's AsyncComputeTaskPool uses async-executor (not Tokio), but
// reqwest/hyper require a Tokio reactor for DNS and HTTP I/O. Provide
// a short-lived single-threaded runtime for this network round-trip.
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| SyncError::Network(format!("tokio rt: {e}")))?
.block_on(provider.pull())
rt.block_on(provider.pull())
});
task_res.0 = Some(task);
status.0 = SyncStatus::Syncing;
@@ -153,6 +149,7 @@ fn start_pull(
fn handle_manual_sync_request(
mut events: MessageReader<ManualSyncRequestEvent>,
provider: Res<SyncProviderResource>,
rt: Res<TokioRuntimeResource>,
mut task_res: ResMut<PullTask>,
mut status: ResMut<SyncStatusResource>,
) {
@@ -164,12 +161,9 @@ fn handle_manual_sync_request(
return; // Already pulling — ignore.
}
let provider = provider.0.clone();
let rt = rt.0.clone();
let task = AsyncComputeTaskPool::get().spawn(async move {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| SyncError::Network(format!("tokio rt: {e}")))?
.block_on(provider.pull())
rt.block_on(provider.pull())
});
task_res.0 = Some(task);
status.0 = SyncStatus::Syncing;
@@ -274,6 +268,7 @@ fn poll_pull_result(
fn push_on_exit(
mut exit_events: MessageReader<AppExit>,
provider: Res<SyncProviderResource>,
rt: Res<TokioRuntimeResource>,
stats: Res<StatsResource>,
achievements: Res<AchievementsResource>,
progress: Res<ProgressResource>,
@@ -284,21 +279,7 @@ fn push_on_exit(
exit_events.clear();
let payload = build_payload(&stats.0, &achievements.0, &progress.0);
let provider = provider.0.clone();
// Prefer an existing tokio runtime; fall back to a temporary one for
// environments (e.g. tests, Android's non-Tokio async executor) where
// reqwest/hyper would otherwise panic with "no reactor running".
let result = match tokio::runtime::Handle::try_current() {
Ok(handle) => handle.block_on(provider.push(&payload)),
Err(_) => match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt.block_on(provider.push(&payload)),
Err(e) => Err(SyncError::Network(format!("tokio rt on exit: {e}"))),
},
};
let result = rt.0.block_on(provider.0.push(&payload));
match result {
Ok(_) => {}
// `UnsupportedPlatform` is the expected response of
@@ -327,6 +308,7 @@ fn push_on_exit(
fn push_replay_on_win(
mut wins: MessageReader<GameWonEvent>,
provider: Res<SyncProviderResource>,
rt: Res<TokioRuntimeResource>,
game: Res<GameStateResource>,
recording: Res<RecordingReplay>,
mut pending: ResMut<PendingReplayUpload>,
@@ -348,12 +330,9 @@ fn push_replay_on_win(
recording.moves.clone(),
);
let provider = provider.0.clone();
let rt = rt.0.clone();
let task = AsyncComputeTaskPool::get().spawn(async move {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| SyncError::Network(format!("tokio rt: {e}")))?
.block_on(provider.push_replay(&replay))
rt.block_on(provider.push_replay(&replay))
});
// If a previous upload is still in flight, drop it — the most
// recent win is the one whose share link the player will care
+20 -23
View File
@@ -53,6 +53,7 @@ use crate::events::{
};
use crate::font_plugin::FontResource;
use crate::settings_plugin::{SettingsResource, SettingsScreen, SettingsStoragePath};
use crate::resources::TokioRuntimeResource;
use crate::sync_plugin::SyncProviderResource;
use crate::ui_modal::spawn_modal;
use crate::ui_theme::{
@@ -301,6 +302,7 @@ fn handle_auth_button(
login_q: Query<&Interaction, (Changed<Interaction>, With<SyncLoginButton>)>,
register_q: Query<&Interaction, (Changed<Interaction>, With<SyncRegisterButton>)>,
fields: Query<(&SyncFieldKind, &SyncFieldBuffer)>,
rt: Res<TokioRuntimeResource>,
mut pending: ResMut<PendingAuthTask>,
mut error_nodes: Query<(&mut Text, &mut TextColor), With<SyncAuthError>>,
mut busy_nodes: Query<&mut Visibility, With<SyncBusyOverlay>>,
@@ -363,26 +365,23 @@ fn handle_auth_button(
let is_register = register_clicked;
let client = SolitaireServerClient::new(url.clone(), username.clone());
let pw = password.clone();
let rt = rt.0.clone();
let task = AsyncComputeTaskPool::get().spawn(async move {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| SyncError::Network(format!("tokio rt: {e}")))?
.block_on(async {
let (access_token, refresh_token) = if is_register {
client.register(&pw).await?
} else {
client.login(&pw).await?
};
// Fetch avatar URL immediately while we have the fresh token.
let avatar_url = client
.fetch_me_with_token(&access_token)
.await
.ok()
.and_then(|(_, url)| url);
Ok((access_token, refresh_token, avatar_url))
})
rt.block_on(async {
let (access_token, refresh_token) = if is_register {
client.register(&pw).await?
} else {
client.login(&pw).await?
};
// Fetch avatar URL immediately while we have the fresh token.
let avatar_url = client
.fetch_me_with_token(&access_token)
.await
.ok()
.and_then(|(_, url)| url);
Ok((access_token, refresh_token, avatar_url))
})
});
pending.task = Some(task);
@@ -575,6 +574,7 @@ fn handle_delete_cancel(
fn handle_delete_confirm(
confirm_q: Query<&Interaction, (Changed<Interaction>, With<DeleteConfirmButton>)>,
provider: Res<SyncProviderResource>,
rt: Res<TokioRuntimeResource>,
mut pending: ResMut<PendingDeleteTask>,
screen: Query<Entity, With<DeleteConfirmScreen>>,
mut commands: Commands,
@@ -587,12 +587,9 @@ fn handle_delete_confirm(
commands.entity(entity).despawn();
}
let provider = provider.0.clone();
let rt = rt.0.clone();
pending.0 = Some(AsyncComputeTaskPool::get().spawn(async move {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| SyncError::Network(format!("tokio rt: {e}")))?
.block_on(provider.delete_account())
rt.block_on(provider.delete_account())
}));
}