目录
完整目录树
app/
├─ controller/
│ ├─ api/
│ │ └─ v1/
│ │ ├─ ChatController.php # 聊天 HTTP API
│ │ ├─ MessageController.php # 消息管理
│ │ └─ ContactController.php # 联系人管理
│ └─ websocket/
│ └─ ChatHandler.php # WebSocket 连接处理
│
├─ model/
│ └─ eloquent/
│ ├─ Message.php # 消息模型
│ ├─ Conversation.php # 会话模型
│ ├─ Contact.php # 联系人模型
│ └─ MessageRead.php # 消息已读记录
│
├─ middleware/
│ ├─ auth/
│ │ ├─ JwtAuthMiddleware.php # JWT 认证
│ │ └─ WebSocketAuthMiddleware.php # WebSocket 认证
│ └─ rate_limit/
│ └─ MessageRateLimitMiddleware.php # 消息限流
│
├─ process/
│ ├─ websocket/
│ │ └─ ChatServer.php # WebSocket 服务进程
│ ├─ task/
│ │ ├─ MessagePushTask.php # 消息推送任务
│ │ └─ OfflineMessageTask.php # 离线消息处理
│ └─ monitor/
│ └─ ConnectionMonitor.php # 连接监控
│
├─ service/
│ ├─ chat/
│ │ ├─ SendMessageService.php # 发送消息
│ │ ├─ CreateConversationService.php # 创建会话
│ │ ├─ MarkMessageReadService.php # 标记已读
│ │ └─ GetChatHistoryService.php # 获取历史消息
│ ├─ connection/
│ │ ├─ HandleConnectionService.php # 处理连接
│ │ ├─ BroadcastMessageService.php # 广播消息
│ │ └─ ManagePresenceService.php # 在线状态管理
│ └─ notification/
│ └─ PushOfflineMessageService.php # 推送离线消息
│
├─ domain/
│ ├─ chat/
│ │ ├─ entity/
│ │ │ ├─ Message.php # 消息实体
│ │ │ ├─ Conversation.php # 会话实体
│ │ │ └─ Participant.php # 参与者实体
│ │ ├─ enum/ # 枚举
│ │ │ ├─ MessageType.php # 消息类型枚举 (text/image/file)
│ │ │ ├─ ConversationType.php # 会话类型枚举 (private/group)
│ │ │ └─ MessageStatus.php # 消息状态枚举
│ │ ├─ vo/ # 值对象
│ │ │ └─ MessageContent.php # 消息内容
│ │ ├─ event/
│ │ │ ├─ MessageSent.php # 消息已发送
│ │ │ ├─ MessageReceived.php # 消息已接收
│ │ │ ├─ MessageRead.php # 消息已读
│ │ │ └─ UserOnlineStatusChanged.php # 用户在线状态变更
│ │ └─ rule/
│ │ ├─ MessageValidationRule.php # 消息验证规则
│ │ └─ ConversationAccessRule.php # 会话访问规则
│ │
│ └─ connection/
│ ├─ entity/
│ │ └─ Connection.php # 连接实体
│ ├─ enum/ # 枚举
│ │ └─ PresenceStatus.php # 在线状态枚举
│ ├─ vo/ # 值对象
│ │ ├─ ConnectionId.php # 连接 ID
│ │ └─ DeviceInfo.php # 设备信息
│ └─ event/
│ ├─ UserConnected.php # 用户已连接
│ └─ UserDisconnected.php # 用户已断开
│
├─ contract/
│ ├─ repository/
│ │ ├─ MessageRepositoryInterface.php
│ │ ├─ ConversationRepositoryInterface.php
│ │ └─ ConnectionRepositoryInterface.php
│ ├─ gateway/
│ │ ├─ PushNotificationGatewayInterface.php # 推送通知
│ │ └─ FileStorageGatewayInterface.php # 文件存储
│ └─ service/
│ └─ WebSocketServiceInterface.php
│
├─ infrastructure/
│ ├─ repository/
│ │ ├─ eloquent/
│ │ │ ├─ EloquentMessageRepository.php
│ │ │ └─ EloquentConversationRepository.php
│ │ └─ redis/
│ │ ├─ RedisConnectionRepository.php # 连接信息存储
│ │ └─ RedisPresenceRepository.php # 在线状态存储
│ │
│ ├─ gateway/
│ │ ├─ push/
│ │ │ ├─ FirebasePushGateway.php # Firebase 推送
│ │ │ └─ ApnsPushGateway.php # APNs 推送
│ │ └─ storage/
│ │ └─ S3FileStorageGateway.php # S3 文件存储
│ │
│ └─ websocket/
│ ├─ WorkermanWebSocketAdapter.php # Workerman 适配器
│ └─ ConnectionManager.php # 连接管理器
│
└─ support/
├─ helper/
│ └─ websocket_helper.php
└─ exception/
├─ ConnectionException.php
└─ MessageException.php模块划分
核心模块
聊天模块 (Chat)
- 发送/接收消息
- 消息历史查询
- 消息已读状态
- 会话管理
连接模块 (Connection)
- WebSocket 连接管理
- 用户在线状态
- 心跳检测
- 断线重连
通知模块 (Notification)
- 离线消息推送
- 系统通知
- 消息提醒
文件模块 (File)
- 图片上传
- 文件上传
- 文件下载
目录职责
app/controller/websocket/
职责: WebSocket 连接处理入口
- 处理 WebSocket 握手
- 路由消息到对应服务
- 管理连接生命周期
app/process/websocket/
职责: WebSocket 服务进程
- 启动 WebSocket 服务器
- 监听端口
- 进程管理
app/service/chat/
职责: 聊天业务编排
- 发送消息流程
- 创建会话流程
- 消息已读流程
app/service/connection/
职责: 连接管理编排
- 连接建立/断开
- 消息广播
- 在线状态同步
app/domain/chat/
职责: 聊天领域逻辑
- 消息实体和业务规则
- 会话实体和访问控制
- 消息类型和状态管理
app/domain/connection/
职责: 连接领域逻辑
- 连接实体
- 在线状态管理
- 设备信息
app/infrastructure/repository/redis/
职责: Redis 存储实现
- 连接信息存储 (临时数据)
- 在线状态存储 (快速查询)
- 会话缓存
app/infrastructure/websocket/
职责: WebSocket 基础设施
- Workerman 适配器
- 连接管理器
- 消息序列化
关键代码示例
1. WebSocket 服务进程
php
<?php
declare(strict_types=1);
namespace app\process\websocket;
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
/**
* WebSocket 服务进程
*/
final class ChatServer
{
public function onWorkerStart(Worker $worker): void
{
echo "WebSocket server started on {$worker->name}\n";
}
public function onConnect(TcpConnection $connection): void
{
echo "New connection: {$connection->id}\n";
}
public function onMessage(TcpConnection $connection, string $data): void
{
// 解析消息
$message = json_decode($data, true);
// 路由到对应处理器
$handler = container()->get(\app\controller\websocket\ChatHandler::class);
$handler->handle($connection, $message);
}
public function onClose(TcpConnection $connection): void
{
echo "Connection closed: {$connection->id}\n";
// 清理连接信息
$handler = container()->get(\app\controller\websocket\ChatHandler::class);
$handler->handleDisconnect($connection);
}
}2. WebSocket 连接处理器
php
<?php
declare(strict_types=1);
namespace app\controller\websocket;
use app\service\connection\HandleConnectionService;
use app\service\chat\SendMessageService;
use Workerman\Connection\TcpConnection;
/**
* WebSocket 连接处理器
*/
final class ChatHandler
{
public function __construct(
private readonly HandleConnectionService $handleConnectionService,
private readonly SendMessageService $sendMessageService
) {
}
public function handle(TcpConnection $connection, array $message): void
{
$type = $message['type'] ?? '';
match ($type) {
'auth' => $this->handleAuth($connection, $message),
'message' => $this->handleMessage($connection, $message),
'ping' => $this->handlePing($connection),
'read' => $this->handleRead($connection, $message),
default => $this->sendError($connection, 'Unknown message type'),
};
}
private function handleAuth(TcpConnection $connection, array $message): void
{
$token = $message['token'] ?? '';
try {
// 验证 token 并建立连接
$userId = $this->handleConnectionService->authenticate($token);
$this->handleConnectionService->register($connection->id, $userId);
// 绑定用户 ID 到连接
$connection->userId = $userId;
$this->sendSuccess($connection, [
'type' => 'auth',
'message' => 'Authentication successful',
]);
} catch (\Exception $e) {
$this->sendError($connection, $e->getMessage());
$connection->close();
}
}
private function handleMessage(TcpConnection $connection, array $message): void
{
if (!isset($connection->userId)) {
$this->sendError($connection, 'Not authenticated');
return;
}
try {
// 发送消息
$result = $this->sendMessageService->handle(
senderId: $connection->userId,
receiverId: $message['to'],
content: $message['content'],
type: $message['message_type'] ?? 'text'
);
$this->sendSuccess($connection, [
'type' => 'message',
'message_id' => $result->id(),
'timestamp' => $result->createdAt()->getTimestamp(),
]);
} catch (\Exception $e) {
$this->sendError($connection, $e->getMessage());
}
}
private function handlePing(TcpConnection $connection): void
{
$connection->send(json_encode([
'type' => 'pong',
'timestamp' => time(),
]));
}
public function handleDisconnect(TcpConnection $connection): void
{
if (isset($connection->userId)) {
$this->handleConnectionService->unregister($connection->id);
}
}
private function sendSuccess(TcpConnection $connection, array $data): void
{
$connection->send(json_encode([
'success' => true,
'data' => $data,
]));
}
private function sendError(TcpConnection $connection, string $error): void
{
$connection->send(json_encode([
'success' => false,
'error' => $error,
]));
}
}3. 发送消息服务
php
<?php
declare(strict_types=1);
namespace app\service\chat;
use app\contract\repository\MessageRepositoryInterface;
use app\contract\repository\ConversationRepositoryInterface;
use app\domain\chat\entity\Message;
use app\domain\chat\vo\MessageContent;
use app\domain\chat\enum\MessageType;
use app\service\connection\BroadcastMessageService;
use support\Db;
/**
* 发送消息服务
*/
final class SendMessageService
{
public function __construct(
private readonly MessageRepositoryInterface $messageRepository,
private readonly ConversationRepositoryInterface $conversationRepository,
private readonly BroadcastMessageService $broadcastMessageService
) {
}
public function handle(
int $senderId,
int $receiverId,
string $content,
string $type = 'text'
): Message {
return Db::transaction(function () use ($senderId, $receiverId, $content, $type) {
// 1. 获取或创建会话
$conversation = $this->conversationRepository->findOrCreatePrivate(
$senderId,
$receiverId
);
// 2. 创建消息实体
$message = Message::create(
conversationId: $conversation->id(),
senderId: $senderId,
content: MessageContent::fromString($content),
type: MessageType::from($type)
);
// 3. 验证消息
$message->validate();
// 4. 持久化
$this->messageRepository->save($message);
// 5. 实时推送给接收者
$this->broadcastMessageService->sendToUser($receiverId, [
'type' => 'new_message',
'message' => [
'id' => $message->id(),
'conversation_id' => $conversation->id(),
'sender_id' => $senderId,
'content' => $content,
'type' => $type,
'created_at' => $message->createdAt()->format('Y-m-d H:i:s'),
],
]);
return $message;
});
}
}4. 消息实体
php
<?php
declare(strict_types=1);
namespace app\domain\chat\entity;
use app\domain\chat\vo\MessageContent;
use app\domain\chat\enum\MessageType;
use app\domain\chat\enum\MessageStatus;
use app\domain\chat\event\MessageSent;
use app\domain\chat\exception\InvalidMessageException;
/**
* 消息实体
*/
final class Message
{
private array $domainEvents = [];
private function __construct(
private readonly int $id,
private readonly int $conversationId,
private readonly int $senderId,
private readonly MessageContent $content,
private readonly MessageType $type,
private MessageStatus $status,
private readonly \DateTimeImmutable $createdAt
) {
}
public static function create(
int $conversationId,
int $senderId,
MessageContent $content,
MessageType $type
): self {
$message = new self(
id: 0,
conversationId: $conversationId,
senderId: $senderId,
content: $content,
type: $type,
status: MessageStatus::Sent,
createdAt: new \DateTimeImmutable()
);
$message->recordEvent(new MessageSent($message));
return $message;
}
public function validate(): void
{
// 验证消息内容
if ($this->content->isEmpty()) {
throw new InvalidMessageException('Message content cannot be empty');
}
// 验证消息长度
if ($this->type->isText() && $this->content->length() > 5000) {
throw new InvalidMessageException('Text message too long');
}
}
public function markAsDelivered(): void
{
$this->status = MessageStatus::Delivered;
}
public function markAsRead(): void
{
$this->status = MessageStatus::Read;
}
// Getters
public function id(): int
{
return $this->id;
}
public function conversationId(): int
{
return $this->conversationId;
}
public function senderId(): int
{
return $this->senderId;
}
public function content(): MessageContent
{
return $this->content;
}
public function type(): MessageType
{
return $this->type;
}
public function status(): MessageStatus
{
return $this->status;
}
public function createdAt(): \DateTimeImmutable
{
return $this->createdAt;
}
private function recordEvent(object $event): void
{
$this->domainEvents[] = $event;
}
public function releaseEvents(): array
{
$events = $this->domainEvents;
$this->domainEvents = [];
return $events;
}
}5. 连接管理服务
php
<?php
declare(strict_types=1);
namespace app\service\connection;
use app\contract\repository\ConnectionRepositoryInterface;
use app\domain\connection\entity\Connection;
use app\domain\connection\vo\ConnectionId;
use app\domain\connection\enum\PresenceStatus;
/**
* 连接管理服务
*/
final class HandleConnectionService
{
public function __construct(
private readonly ConnectionRepositoryInterface $connectionRepository
) {
}
public function authenticate(string $token): int
{
// 验证 JWT token
$payload = $this->verifyJwtToken($token);
return $payload['user_id'];
}
public function register(string $connectionId, int $userId): void
{
// 创建连接实体
$connection = Connection::create(
connectionId: ConnectionId::fromString($connectionId),
userId: $userId,
status: PresenceStatus::Online
);
// 存储到 Redis
$this->connectionRepository->save($connection);
// 广播用户上线状态
$this->broadcastPresenceChange($userId, 'online');
}
public function unregister(string $connectionId): void
{
// 获取连接信息
$connection = $this->connectionRepository->findByConnectionId(
ConnectionId::fromString($connectionId)
);
if ($connection === null) {
return;
}
// 删除连接
$this->connectionRepository->delete($connection);
// 检查用户是否还有其他连接
$hasOtherConnections = $this->connectionRepository->hasActiveConnections(
$connection->userId()
);
if (!$hasOtherConnections) {
// 广播用户离线状态
$this->broadcastPresenceChange($connection->userId(), 'offline');
}
}
private function verifyJwtToken(string $token): array
{
// JWT 验证逻辑
// 返回 payload
return [];
}
private function broadcastPresenceChange(int $userId, string $status): void
{
// 广播在线状态变更
}
}6. 广播消息服务
php
<?php
declare(strict_types=1);
namespace app\service\connection;
use app\contract\repository\ConnectionRepositoryInterface;
use app\infrastructure\websocket\ConnectionManager;
/**
* 广播消息服务
*/
final class BroadcastMessageService
{
public function __construct(
private readonly ConnectionRepositoryInterface $connectionRepository,
private readonly ConnectionManager $connectionManager
) {
}
public function sendToUser(int $userId, array $data): void
{
// 获取用户的所有连接
$connections = $this->connectionRepository->findByUserId($userId);
foreach ($connections as $connection) {
$this->connectionManager->send(
$connection->connectionId()->toString(),
json_encode($data)
);
}
}
public function sendToConversation(int $conversationId, array $data): void
{
// 获取会话的所有参与者
$participantIds = $this->getConversationParticipants($conversationId);
foreach ($participantIds as $userId) {
$this->sendToUser($userId, $data);
}
}
public function broadcast(array $data): void
{
// 广播给所有在线用户
$this->connectionManager->broadcast(json_encode($data));
}
private function getConversationParticipants(int $conversationId): array
{
// 获取会话参与者 ID 列表
return [];
}
}7. Redis 连接仓储
php
<?php
declare(strict_types=1);
namespace app\infrastructure\repository\redis;
use app\contract\repository\ConnectionRepositoryInterface;
use app\domain\connection\entity\Connection;
use app\domain\connection\vo\ConnectionId;
use app\domain\connection\enum\PresenceStatus;
use support\Redis;
/**
* Redis 连接仓储
*/
final class RedisConnectionRepository implements ConnectionRepositoryInterface
{
private const PREFIX = 'chat:connection:';
private const USER_PREFIX = 'chat:user:connections:';
private const TTL = 3600; // 1 hour
public function save(Connection $connection): void
{
$key = self::PREFIX . $connection->connectionId()->toString();
Redis::setex($key, self::TTL, json_encode([
'connection_id' => $connection->connectionId()->toString(),
'user_id' => $connection->userId(),
'status' => $connection->status()->value(),
'connected_at' => $connection->connectedAt()->format('Y-m-d H:i:s'),
]));
// 添加到用户连接集合
$userKey = self::USER_PREFIX . $connection->userId();
Redis::sadd($userKey, $connection->connectionId()->toString());
Redis::expire($userKey, self::TTL);
}
public function findByConnectionId(ConnectionId $connectionId): ?Connection
{
$key = self::PREFIX . $connectionId->toString();
$data = Redis::get($key);
if ($data === null) {
return null;
}
$data = json_decode($data, true);
return Connection::reconstitute(
connectionId: ConnectionId::fromString($data['connection_id']),
userId: $data['user_id'],
status: PresenceStatus::from($data['status']),
connectedAt: new \DateTimeImmutable($data['connected_at'])
);
}
public function findByUserId(int $userId): array
{
$userKey = self::USER_PREFIX . $userId;
$connectionIds = Redis::smembers($userKey);
$connections = [];
foreach ($connectionIds as $connectionId) {
$connection = $this->findByConnectionId(
ConnectionId::fromString($connectionId)
);
if ($connection !== null) {
$connections[] = $connection;
}
}
return $connections;
}
public function delete(Connection $connection): void
{
$key = self::PREFIX . $connection->connectionId()->toString();
Redis::del($key);
// 从用户连接集合中移除
$userKey = self::USER_PREFIX . $connection->userId();
Redis::srem($userKey, $connection->connectionId()->toString());
}
public function hasActiveConnections(int $userId): bool
{
$userKey = self::USER_PREFIX . $userId;
return Redis::scard($userKey) > 0;
}
}最佳实践
WebSocket 连接管理
- 认证机制: 使用 JWT token 进行 WebSocket 认证
- 心跳检测: 定期发送 ping/pong 保持连接活跃
- 断线重连: 客户端实现指数退避重连策略
- 连接池: 使用 Redis 管理连接信息,支持水平扩展
消息可靠性
- 消息确认: 实现消息送达确认机制
- 离线消息: 用户离线时存储消息,上线后推送
- 消息去重: 使用消息 ID 防止重复消息
- 消息顺序: 使用时间戳保证消息顺序
性能优化
- Redis 缓存: 连接信息、在线状态存储在 Redis
- 消息队列: 使用队列处理离线消息推送
- 数据库优化: 消息表按会话 ID 分区
- 连接限制: 限制单用户最大连接数