Fix threading issues?

main
Thomas Woischnig 2026-04-26 00:54:27 +02:00
parent da2a9431df
commit d1542df493
8 changed files with 94 additions and 160 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "com.incobyte.lobbyclient", "name": "com.incobyte.lobbyclient",
"version": "1.0.7", "version": "1.0.8",
"displayName": "Game Lobby Client", "displayName": "Game Lobby Client",
"description": "Provides a client for the game lobby server to list and join lobbies", "description": "Provides a client for the game lobby server to list and join lobbies",
"unity": "2022.3", "unity": "2022.3",

View File

@ -1,4 +1,5 @@
using System; using LobbyServerDto;
using System;
using System.IO; using System.IO;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
@ -25,7 +26,20 @@ namespace Lobbies
private NetworkStream? networkStream; private NetworkStream? networkStream;
private CancellationTokenSource? cancellationTokenSource = new CancellationTokenSource(); private CancellationTokenSource? cancellationTokenSource = new CancellationTokenSource();
private bool running = false; private bool running = false;
private readonly SemaphoreSlim sendLock = new SemaphoreSlim(1, 1); private readonly BufferRental bufferRental = new BufferRental(MaxMessageSize);
private async Task ReadExact(NetworkStream stream, Memory<byte> buffer, int length, CancellationToken token)
{
int offset = 0;
while (offset < length)
{
int read = await stream.ReadAsync(buffer.Slice(offset), token);
if (read == 0)
throw new EndOfStreamException();
offset += read;
}
}
internal async Task Connect(string host, int port) internal async Task Connect(string host, int port)
{ {
@ -60,80 +74,25 @@ namespace Lobbies
pingTask = Task.Run(() => PingLoop(token), token); pingTask = Task.Run(() => PingLoop(token), token);
Memory<byte> buffer = new byte[MaxMessageSize]; Memory<byte> buffer = new byte[MaxMessageSize];
Memory<byte> target = new byte[MaxMessageSize];
int bufferedBytes = 0;
int currentMessageLength = -1;
int currentMessageOffset = 0;
while (running && !token.IsCancellationRequested) while (running && !token.IsCancellationRequested)
{ {
if (bufferedBytes == buffer.Length) await ReadExact(networkStream, buffer, HeaderSize, token);
throw new InvalidDataException("Receive buffer overflow.");
int bytesRead = await networkStream.ReadAsync(buffer.Slice(bufferedBytes), token); int length = BitConverter.ToInt32(buffer.Span.Slice(0, HeaderSize));
if (bytesRead == 0)
break;
bufferedBytes += bytesRead; if (length < 0)
while (true)
{
if (currentMessageLength < 0)
{
if (bufferedBytes < HeaderSize)
break;
currentMessageLength = BitConverter.ToInt32(buffer.Span.Slice(0, HeaderSize));
if (currentMessageLength < 0)
throw new InvalidDataException("Negative message length received."); throw new InvalidDataException("Negative message length received.");
if (currentMessageLength > MaxMessageSize) if (length > MaxMessageSize)
throw new InvalidDataException($"Message too large: {currentMessageLength} > {MaxMessageSize}."); throw new InvalidDataException($"Message too large: {length} > {MaxMessageSize}.");
if (bufferedBytes > HeaderSize) if (length == 0)
buffer.Slice(HeaderSize, bufferedBytes - HeaderSize).CopyTo(buffer);
bufferedBytes -= HeaderSize;
currentMessageOffset = 0;
if (currentMessageLength == 0)
{
currentMessageLength = -1;
continue; continue;
}
}
int remainingMessageBytes = currentMessageLength - currentMessageOffset; await ReadExact(networkStream, buffer, length, token);
if (remainingMessageBytes <= 0)
{
DataReceived?.Invoke(currentMessageLength, target.Slice(0, currentMessageLength));
currentMessageLength = -1;
currentMessageOffset = 0;
continue;
}
if (bufferedBytes == 0) DataReceived?.Invoke(length, buffer.Slice(0, length));
break;
int chunkSize = Math.Min(bufferedBytes, remainingMessageBytes);
buffer.Slice(0, chunkSize).CopyTo(target.Slice(currentMessageOffset));
currentMessageOffset += chunkSize;
if (bufferedBytes > chunkSize)
buffer.Slice(chunkSize, bufferedBytes - chunkSize).CopyTo(buffer);
bufferedBytes -= chunkSize;
if (currentMessageOffset == currentMessageLength)
{
DataReceived?.Invoke(currentMessageLength, target.Slice(0, currentMessageLength));
currentMessageLength = -1;
currentMessageOffset = 0;
}
}
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
@ -185,37 +144,42 @@ namespace Lobbies
if (!running || networkStream == null) if (!running || networkStream == null)
return; return;
await sendLock.WaitAsync(token);
try try
{ {
await networkStream.WriteAsync(BitConverter.GetBytes(0), 0, 4, token); await networkStream.WriteAsync(BitConverter.GetBytes(0), 0, 4, token);
} }
finally catch
{ {
sendLock.Release();
} }
} }
internal async Task Send(byte[] buffer, int offset, int count) internal async Task Send(byte[] buffer, int offset, int count)
{ {
byte[] frame = bufferRental.Rent();
try try
{ {
if (!running || networkStream == null || cancellationTokenSource == null) if (!running || networkStream == null || cancellationTokenSource == null)
return; return;
await sendLock.WaitAsync(cancellationTokenSource.Token); if (count < 0 || count > MaxMessageSize)
try throw new ArgumentOutOfRangeException(nameof(count));
if (offset < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(offset));
BitConverter.GetBytes(count).CopyTo(frame, 0);
Buffer.BlockCopy(buffer, offset, frame, HeaderSize, count);
await networkStream.WriteAsync(frame, 0, HeaderSize + count, cancellationTokenSource.Token);
}
catch
{ {
await networkStream.WriteAsync(BitConverter.GetBytes(count), 0, 4, cancellationTokenSource.Token);
await networkStream.WriteAsync(buffer, offset, count, cancellationTokenSource.Token);
} }
finally finally
{ {
sendLock.Release(); bufferRental.Return(frame);
} }
} }
catch { }
}
internal void Stop() internal void Stop()
{ {

View File

@ -3,7 +3,6 @@ using Lobbies;
using LobbyClientTest; using LobbyClientTest;
using LobbyServerDto; using LobbyServerDto;
using System.Net; using System.Net;
using System.Net.WebSockets;
Console.WriteLine("Starting lobby client v0.7!"); Console.WriteLine("Starting lobby client v0.7!");
var lobbyClient = new LobbyClient(); var lobbyClient = new LobbyClient();

View File

@ -13,6 +13,10 @@ namespace LobbyServer
public delegate void ClientDisconnectedEventArgs(int clientId); public delegate void ClientDisconnectedEventArgs(int clientId);
public event ClientDisconnectedEventArgs? ClientDisconnected; public event ClientDisconnectedEventArgs? ClientDisconnected;
private const int HeaderSize = 4;
private const int MaxMessageSize = 4096;
private readonly BufferRental bufferRental = new BufferRental(MaxMessageSize);
internal class Client : IDisposable internal class Client : IDisposable
{ {
internal CancellationTokenSource? cancellationToken = null; internal CancellationTokenSource? cancellationToken = null;
@ -100,15 +104,36 @@ namespace LobbyServer
public async Task Send(int clientId, byte[] buffer, int offset, int count) public async Task Send(int clientId, byte[] buffer, int offset, int count)
{ {
byte[] frame = bufferRental.Rent();
try try
{ {
if (activeClients.TryGetValue(clientId, out var lobbyClient) && lobbyClient.stream != null && lobbyClient.cancellationToken != null) if (activeClients.TryGetValue(clientId, out var lobbyClient) && lobbyClient.stream != null && lobbyClient.cancellationToken != null)
{ {
await lobbyClient.stream.WriteAsync(BitConverter.GetBytes(count), 0, 4, lobbyClient.cancellationToken.Token); BitConverter.GetBytes(count).CopyTo(frame, 0);
await lobbyClient.stream.WriteAsync(buffer, offset, count, lobbyClient.cancellationToken.Token); Buffer.BlockCopy(buffer, offset, frame, HeaderSize, count);
await lobbyClient.stream.WriteAsync(frame, 0, HeaderSize + count, lobbyClient.cancellationToken.Token);
} }
} }
catch { } catch { }
finally
{
bufferRental.Return(frame);
}
}
private async Task ReadExact(NetworkStream stream, Memory<byte> buffer, int length, CancellationToken token)
{
int offset = 0;
while (offset < length)
{
int read = await stream.ReadAsync(buffer.Slice(offset), token);
if (read == 0)
throw new EndOfStreamException();
offset += read;
}
} }
private async Task ClientThread(TcpClient client) private async Task ClientThread(TcpClient client)
@ -120,8 +145,8 @@ namespace LobbyServer
{ {
var stream = client.GetStream(); var stream = client.GetStream();
Memory<byte> buffer = new byte[4096]; Memory<byte> buffer = new byte[MaxMessageSize];
Memory<byte> target = new byte[4096];
lobbyClient = new Client lobbyClient = new Client
{ {
@ -133,11 +158,6 @@ namespace LobbyServer
activeClients.TryAdd(myId, lobbyClient); activeClients.TryAdd(myId, lobbyClient);
int bufferedBytes = 0;
int currentMessageLength = -1;
int currentMessageOffset = 0;
bool validMessage = true;
var lobbyClientConnectionInfo = new LobbyClientConnectionInfo { Id = myId }; var lobbyClientConnectionInfo = new LobbyClientConnectionInfo { Id = myId };
byte[] sendBuffer = new byte[128]; byte[] sendBuffer = new byte[128];
int sendLen = lobbyClientConnectionInfo.Serialize(sendBuffer); int sendLen = lobbyClientConnectionInfo.Serialize(sendBuffer);
@ -145,70 +165,23 @@ namespace LobbyServer
while (running && !lobbyClient.cancellationToken.Token.IsCancellationRequested) while (running && !lobbyClient.cancellationToken.Token.IsCancellationRequested)
{ {
int bytesRead = await stream.ReadAsync(buffer.Slice(bufferedBytes), lobbyClient.cancellationToken.Token); await ReadExact(stream, buffer, HeaderSize, lobbyClient.cancellationToken.Token);
if (bytesRead == 0)
break;
bufferedBytes += bytesRead;
lobbyClient.lastSeenUtc = DateTime.UtcNow; lobbyClient.lastSeenUtc = DateTime.UtcNow;
while (true) int length = BitConverter.ToInt32(buffer.Span.Slice(0, HeaderSize));
{
if (currentMessageLength < 0)
{
if (bufferedBytes < 4)
break;
currentMessageLength = BitConverter.ToInt32(buffer.Span.Slice(0, 4)); if (length < 0)
if (currentMessageLength < 0)
throw new InvalidDataException("Negative message length received."); throw new InvalidDataException("Negative message length received.");
buffer.Slice(4, bufferedBytes - 4).CopyTo(buffer); if (length > MaxMessageSize)
bufferedBytes -= 4; throw new InvalidDataException($"Message too large: {length} > {MaxMessageSize}.");
currentMessageOffset = 0;
validMessage = currentMessageLength <= target.Length;
if (currentMessageLength == 0) if (length == 0)
{
currentMessageLength = -1;
continue; continue;
}
}
if (bufferedBytes == 0) await ReadExact(stream, buffer, length, lobbyClient.cancellationToken.Token);
break;
int chunkSize = Math.Min(bufferedBytes, currentMessageLength - currentMessageOffset); DataReceived?.Invoke(myId, length, buffer.Slice(0, length));
if (chunkSize > 0)
{
if (validMessage)
{
buffer.Slice(0, chunkSize).CopyTo(target.Slice(currentMessageOffset));
}
currentMessageOffset += chunkSize;
buffer.Slice(chunkSize, bufferedBytes - chunkSize).CopyTo(buffer);
bufferedBytes -= chunkSize;
}
if (currentMessageOffset < currentMessageLength)
break;
if (validMessage)
{
DataReceived?.Invoke(myId, currentMessageLength, target.Slice(0, currentMessageLength));
}
currentMessageLength = -1;
currentMessageOffset = 0;
validMessage = true;
}
if (bufferedBytes == buffer.Length && currentMessageLength < 0)
throw new InvalidDataException("Receive buffer overflow while waiting for message header.");
} }
} }
finally finally

View File

@ -1,11 +1,10 @@
using System.Net.Sockets; using System.Net.Sockets;
using System.Net;
using LobbyServerDto; using LobbyServerDto;
namespace LobbyServer namespace LobbyServer
{ {
/// <summary> /// <summary>
/// Small udp server to receive udp packets and report their remote ip and port to a event delegate /// Small udp server to receive udp packets and report their remote ip and port to an event delegate
/// </summary> /// </summary>
internal class UdpCandidateServer : IDisposable internal class UdpCandidateServer : IDisposable
{ {

View File

@ -1,5 +1,4 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Drawing;
namespace LobbyServerDto namespace LobbyServerDto
{ {