00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #pragma once
00015
00019 #include <vector>
00020 #include <map>
00021 #include <utility>
00022 #include <set>
00023
00024 #include "WaitFreeQueue.h"
00025 #include "LockFreePoolAllocator.h"
00026 #include "Lockable.h"
00027 #include "Socket.h"
00028 #include "IMessageHandler.h"
00029 #include "BasicSerializedDataTypes.h"
00030 #include "Datagram.h"
00031 #include "FragmentedTransferManager.h"
00032 #include "NetworkMessage.h"
00033 #include "Event.h"
00034 #include "DataSerializer.h"
00035 #include "DataDeserializer.h"
00036
00037 #include "MaxHeap.h"
00038 #include "Clock.h"
00039 #include "PolledTimer.h"
00040 #include "Thread.h"
00041
00042 namespace kNet
00043 {
00044
00045 class MessageConnection;
00046 class UDPMessageConnection;
00047 class TCPMessageConnection;
00048 class NetworkServer;
00049 class Network;
00050 class NetworkWorkerThread;
00051 class FragmentedSendManager;
00052
00053 #ifdef WIN32
00054 struct FragmentedSendManager::FragmentedTransfer;
00055 #endif
00056
00058 struct ConnectionStatistics
00059 {
00061 struct PingTrack
00062 {
00063 tick_t pingSentTick;
00064 tick_t pingReplyTick;
00065 unsigned long pingID;
00066 bool replyReceived;
00067 };
00069 std::vector<PingTrack> ping;
00070
00072 struct TrafficTrack
00073 {
00074 tick_t tick;
00075 unsigned long packetsIn;
00076 unsigned long packetsOut;
00077 unsigned long messagesIn;
00078 unsigned long messagesOut;
00079 unsigned long bytesIn;
00080 unsigned long bytesOut;
00081 };
00083 std::vector<TrafficTrack> traffic;
00084
00086 struct DatagramIDTrack
00087 {
00088 tick_t tick;
00089 packet_id_t packetID;
00090 };
00092 std::vector<DatagramIDTrack> recvPacketIDs;
00093 };
00094
00096 class NetworkMessagePriorityCmp
00097 {
00098 public:
00099 int operator ()(const NetworkMessage *a, const NetworkMessage *b)
00100 {
00101 assert(a && b);
00102 if (a->priority < b->priority) return -1;
00103 if (b->priority < a->priority) return 1;
00104
00105 if (a->MessageNumber() < b->MessageNumber()) return 1;
00106 if (b->MessageNumber() < a->MessageNumber()) return -1;
00107
00108 return 0;
00109 }
00110 };
00111
00113 enum ConnectionState
00114 {
00115 ConnectionPending,
00116 ConnectionOK,
00117 ConnectionDisconnecting,
00118 ConnectionPeerClosed,
00119 ConnectionClosed
00120 };
00121
00123 std::string ConnectionStateToString(ConnectionState state);
00124
00127 class MessageConnection : public RefCountable
00128 {
00129 public:
00130 virtual ~MessageConnection();
00131
00133 ConnectionState GetConnectionState() const;
00134
00136 bool IsReadOpen() const;
00137
00139 bool IsWriteOpen() const;
00140
00143 bool IsPending() const;
00144
00146 bool Connected() const { return IsReadOpen() || IsWriteOpen(); }
00147
00151 void RunModalClient();
00152
00158 bool WaitToEstablishConnection(int maxMSecsToWait = 500);
00159
00171 void Disconnect(int maxMSecsToWait = 500);
00172
00180 void Close(int maxMSecsToWait = 500);
00181
00182
00183
00184
00196 NetworkMessage *StartNewMessage(unsigned long id, size_t numBytes = 0);
00197
00207 void EndAndQueueMessage(NetworkMessage *msg, size_t numBytes = (size_t)(-1), bool internalQueue = false);
00208
00212 void SendMessage(unsigned long id, bool reliable, bool inOrder, unsigned long priority, unsigned long contentID,
00213 const char *data, size_t numBytes);
00214
00216 template<typename SerializableData>
00217 void SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
00218 bool reliable, unsigned long priority, unsigned long contentID = 0);
00219
00221 template<typename SerializableMessage>
00222 void Send(const SerializableMessage &data, unsigned long contentID = 0);
00223
00226 void PauseOutboundSends();
00227
00229 void ResumeOutboundSends();
00230
00232 size_t NumInboundMessagesPending() const { return inboundMessageQueue.Size(); }
00233
00235 size_t NumOutboundMessagesPending() const { return outboundQueue.Size() + outboundAcceptQueue.Size(); }
00236
00238 Socket *GetSocket() { return socket; }
00239
00241 EndPoint LocalEndPoint() const;
00242
00244 EndPoint RemoteEndPoint() const;
00245
00253 void SetMaximumDataSendRate(int numBytesPerSec, int numDatagramsPerSec);
00254
00256 void RegisterInboundMessageHandler(IMessageHandler *handler);
00257
00268 void Process(int maxMessagesToProcess = 100);
00269
00274 void WaitForMessage(int maxMSecsToWait);
00275
00287 NetworkMessage *ReceiveMessage(int maxMSecsToWait = -1);
00288
00291 void FreeMessage(NetworkMessage *msg);
00292
00294 std::string ToString() const;
00295
00297 void DumpStatus() const;
00298
00299
00300
00302 float RoundTripTime() const { return rtt; }
00303
00305 float Latency() const { return rtt / 2.f; }
00306
00308 float LastHeardTime() const { return Clock::TicksToMillisecondsF(Clock::TicksInBetween(Clock::Tick(), lastHeardTime)); }
00309
00310 float PacketsInPerSec() const { return packetsInPerSec; }
00311 float PacketsOutPerSec() const { return packetsOutPerSec; }
00312 float MsgsInPerSec() const { return msgsInPerSec; }
00313 float MsgsOutPerSec() const { return msgsOutPerSec; }
00314 float BytesInPerSec() const { return bytesInPerSec; }
00315 float BytesOutPerSec() const { return bytesOutPerSec; }
00316
00319 Lockable<ConnectionStatistics> statistics;
00320
00321 protected:
00322 friend class NetworkWorkerThread;
00323
00325 Network *owner;
00326
00328 NetworkServer *ownerServer;
00329
00332 NetworkWorkerThread *workerThread;
00333
00334 #ifdef _DEBUG
00335
00336 ThreadId workerThreadId;
00337
00339 bool InWorkerThreadContext() const;
00340
00342 bool InMainThreadContext() const;
00343
00345 bool IsWorkerThreadRunning() const { return workerThread != 0; }
00346 #endif
00347
00349 WaitFreeQueue<NetworkMessage*> outboundAcceptQueue;
00350
00352 WaitFreeQueue<NetworkMessage*> inboundMessageQueue;
00353
00356
00357 WaitFreeQueue<NetworkMessage*> outboundQueue;
00358
00360 Lockable<FragmentedSendManager> fragmentedSends;
00361
00363 FragmentedReceiveManager fragmentedReceives;
00364
00367 LockFreePoolAllocator<NetworkMessage> messagePool;
00368
00370 PolledTimer pingTimer;
00371
00373 PolledTimer statsRefreshTimer;
00374
00376 enum PacketSendResult
00377 {
00378 PacketSendOK,
00379 PacketSendSocketClosed,
00380 PacketSendSocketFull,
00381 PacketSendNoMessages,
00382 PacketSendThrottled
00383 };
00384
00386 virtual PacketSendResult SendOutPacket() = 0;
00387
00389 virtual void SendOutPackets() = 0;
00390
00392 virtual unsigned long TimeUntilCanSendPacket() const = 0;
00393
00395 void UpdateConnection();
00396
00398 virtual void DoUpdateConnection() {}
00399
00401 void SetPeerClosed();
00402
00403 virtual void DumpConnectionStatus() const {}
00404
00406 Event NewOutboundMessagesEvent() const;
00407
00409 enum SocketReadResult
00410 {
00411 SocketReadOK,
00412 SocketReadError,
00413 SocketReadThrottled,
00414 };
00415
00421 virtual SocketReadResult ReadSocket(size_t &bytesRead) = 0;
00422
00423 SocketReadResult ReadSocket();
00424
00426 void SetWorkerThread(NetworkWorkerThread *thread);
00427
00428 void HandleInboundMessage(packet_id_t packetID, const char *data, size_t numBytes);
00429
00431 NetworkMessage *AllocateNewMessage();
00432
00433
00434 void SendPingRequestMessage();
00435
00436 void HandlePingRequestMessage(const char *data, size_t numBytes);
00437
00438 void HandlePingReplyMessage(const char *data, size_t numBytes);
00439
00440
00441 void FreeMessageData();
00442
00444 void DetectConnectionTimeOut();
00445
00447 void ComputeStats();
00448
00450 void AddOutboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages);
00451
00453 void AddInboundStats(unsigned long numBytes, unsigned long numPackets, unsigned long numMessages);
00454
00456 void AcceptOutboundMessages();
00457
00459 virtual void PerformDisconnection() = 0;
00460
00462 IMessageHandler *inboundMessageHandler;
00463
00465 Socket *socket;
00466
00468 ConnectionState connectionState;
00469
00471 bool bOutboundSendsPaused;
00472
00473 friend class NetworkServer;
00474 friend class Network;
00475
00477 Event eventMsgsOutAvailable;
00478
00479 void operator=(const MessageConnection &);
00480 MessageConnection(const MessageConnection &);
00481
00482 float rtt;
00483 tick_t lastHeardTime;
00484 float packetsInPerSec;
00485 float packetsOutPerSec;
00486 float msgsInPerSec;
00487 float msgsOutPerSec;
00488 float bytesInPerSec;
00489 float bytesOutPerSec;
00490
00493 unsigned long outboundMessageNumberCounter;
00494
00497 unsigned long outboundReliableMessageNumberCounter;
00498
00500 typedef std::pair<u32, u32> MsgContentIDPair;
00501
00502 typedef std::map<MsgContentIDPair, std::pair<packet_id_t, tick_t> > ContentIDReceiveTrack;
00503
00506 ContentIDReceiveTrack inboundContentIDStamps;
00507
00508 typedef std::map<MsgContentIDPair, NetworkMessage*> ContentIDSendTrack;
00509
00510 ContentIDSendTrack outboundContentIDMessages;
00511
00512 void CheckAndSaveOutboundMessageWithContentID(NetworkMessage *msg);
00513
00514 void ClearOutboundMessageWithContentID(NetworkMessage *msg);
00515
00520 bool CheckAndSaveContentIDStamp(u32 messageID, u32 contentID, packet_id_t packetID);
00521
00522 void SplitAndQueueMessage(NetworkMessage *message, bool internalQueue, size_t maxFragmentSize);
00523
00524 static const unsigned long MsgIdPingRequest = 1;
00525 static const unsigned long MsgIdPingReply = 2;
00526 static const unsigned long MsgIdFlowControlRequest = 3;
00527 static const unsigned long MsgIdPacketAck = 4;
00528 static const unsigned long MsgIdDisconnect = 0x3FFFFFFF;
00529 static const unsigned long MsgIdDisconnectAck = 0x3FFFFFFE;
00530
00532 explicit MessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState);
00533
00534 virtual void Initialize() {}
00535
00536 virtual bool HandleMessage(packet_id_t , u32 , const char * , size_t ) { return false; }
00537 };
00538
00539 template<typename SerializableData>
00540 void MessageConnection::SendStruct(const SerializableData &data, unsigned long id, bool inOrder,
00541 bool reliable, unsigned long priority, unsigned long contentID)
00542 {
00543 assert(InMainThreadContext());
00544
00545 const size_t dataSize = data.Size();
00546
00547 NetworkMessage *msg = StartNewMessage(id, dataSize);
00548
00549 if (dataSize > 0)
00550 {
00551 DataSerializer mb(msg->data, dataSize);
00552 data.SerializeTo(mb);
00553 assert(mb.BytesFilled() == dataSize);
00554 }
00555
00556 msg->id = id;
00557 msg->contentID = contentID;
00558 msg->inOrder = inOrder;
00559 msg->priority = priority;
00560 msg->reliable = reliable;
00561
00562 EndAndQueueMessage(msg);
00563 }
00564
00565 template<typename SerializableMessage>
00566 void MessageConnection::Send(const SerializableMessage &data, unsigned long contentID)
00567 {
00568 assert(InMainThreadContext());
00569
00570 const size_t dataSize = data.Size();
00571
00572 NetworkMessage *msg = StartNewMessage(data.MessageID(), dataSize);
00573
00574 if (dataSize > 0)
00575 {
00576 DataSerializer mb(msg->data, dataSize);
00577 data.SerializeTo(mb);
00578 assert(mb.BytesFilled() == dataSize);
00579 }
00580
00581 msg->id = data.MessageID();
00582 msg->contentID = contentID;
00583 msg->inOrder = data.inOrder;
00584 msg->priority = data.priority;
00585 msg->reliable = data.reliable;
00586
00587 EndAndQueueMessage(msg);
00588 }
00589
00590 }