DIOCP开源项目-详解编码器和和如安在传输中参加紧缩和解压功能   
               添加时间:2013-5-30 点击量: 
 
              >>>>>>DIOCP评论辩论群:320641073
>>>>>>SVN源码和DEMO:https://code.google.com/p/diocp/ 
收集带宽有限,对数据进行紧缩再进行传送可以有效的哄骗带宽和进步传输的效力。
DIOCP中对数据进行紧缩传送,须要批改编码器和,先说说这两个器材的的用法和功能。
       
          举个例子:我们要把一台电脑快递回老家给正在上学的小弟应用,那么老家就是办事端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。
在这个过程中,发送一个对象(电脑)用到客户端的编码器,接管对象(电脑)用到办事端的。
          
          在之前编写的DIOCP例子中都应用了JSonStream对象进行传输,这个TJSonStream类,首要有两个项目组的数据,第一项目组包含JSon字符串数据,第二项目组包含Stream流数据。在三层数据保存例子中我们把客户端的恳求放在JSon中。办事端接管数据后经由过程办事端的还原成JSonStream对象。然掉队行逻辑的处理惩罚,办事端回写对象时,同过办事端的编码器对回写的JSonStream进行编码发送,客户端经由过程客户端的接管并还原成JSonStream对象。在办事端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,若是对Stream对象进行紧缩,在做紧缩中调试法度时发明一个70K的数据包进行一下紧缩,数据包可以变成7K了,对文本紧缩结果还是很不错的。
        当然我们容许本身定义和谈和编写编码和,我们可以定义本身的TStrStream类或者TXMLStream等等,然后编写响应的编码器和就行了。
        下面解析下代码
客户端代码:
var
  lvJSonStream, lvRecvObject:TJsonStream;
  lvStream:TStream;
  lvData:AnsiString;
  l, j, x:Integer;
begin
  lvJSonStream := TJsonStream.Create;
  try
    lvJSonStream.JSon := SO();
    lvJSonStream.JSon.I[cmdIndex] := 1001;   //打开一个SQL脚本,获取数据
    lvJSonStream.Json.S[sql] := mmoSQL.Lines.Text;
    FClientSocket.sendObject(lvJSonStream);
  finally
    lvJSonStream.Free;
  end;
起首是建树一个TJSonStream对象,然后设定信息,因为是产生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,
procedure TD10ClientSocket.sendObject(pvObject:TObject);
begin
  if FCoder = nil then raise Exception.Create(没有注册对象编码和(registerCoder)!);
  if not Active then Exit;
  FCoder.Encode(Self, pvObject);
end;
可以看出这里调用注册的编码器,调用Encode函数
客户端编码器的Encode函数如下
procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
    TObject);
var
  lvJSonStream:TJsonStream;
  lvJSonLength:Integer;
  lvStreamLength:Integer;
  sData, lvTemp:String;
  lvStream:TStream;
  lvTempBuf:PAnsiChar;
  lvBytes, lvTempBytes:TBytes;
  
  l:Integer;
  lvBufBytes:array[0..1023] of byte;
begin
  if pvObject = nil then exit;
  lvJSonStream := TJsonStream(pvObject);
  
  //是否紧缩流
  if (lvJSonStream.Stream <> nil) then
  begin
    if lvJSonStream.Json.O[config.stream.zip] <> nil then
    begin
      if lvJSonStream.Json.B[config.stream.zip] then
      begin
        //紧缩流
        TZipTools.compressStreamEx(lvJSonStream.Stream);
      end;
    end else if lvJSonStream.Stream.Size > 0 then
    begin
      //紧缩流
      TZipTools.compressStreamEx(lvJSonStream.Stream);
      lvJSonStream.Json.B[config.stream.zip] := true;
    end;
  end;
  sData := lvJSonStream.JSon.AsJSon(True);
  lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData);
  lvJSonLength := Length(lvBytes);
  lvStream := lvJSonStream.Stream;
  lvJSonLength := TNetworkTools.htonl(lvJSonLength);
  if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;
  if lvStream <> nil then
  begin
    lvStreamLength := lvStream.Size;
  end else
  begin
    lvStreamLength := 0;
  end;
  lvStreamLength := TNetworkTools.htonl(lvStreamLength);
  if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;
  //json bytes
  if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;
  if lvStream.Size > 0 then
  begin
    lvStream.Position := 0;
    repeat
      l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
      if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
    until (l = 0);
  end;
end;
该项目组完成的功能有
1.断定Stream数据是否须要紧缩。
2.发送Json数据长度和Stream数据长度
3.发送Json数据
4.发送Stream数据
申明:
lvJSonLength := TNetworkTools.ntohl(lvJSonLength); 
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
这三行代码须要申明下,是为了兼容java,netty做办事端便利解码,当然我们也可以不进行转换。直接发送也是可以的。只要共同办事端就行了。和谈是本身定义的。
接下来是办事端IOCP队列中会收到接管数据的旌旗灯号。
function TIOCPObject.processIOQueued: Integer;
var
  BytesTransferred:Cardinal;
  lvResultStatus:BOOL;
  lvRet:Integer;
  lvIOData:POVERLAPPEDEx;
  lvDataObject:TObject;
  lvClientContext:TIOCPClientContext;
begin
  Result := IOCP_RESULT_OK;
  //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接管到数据为止
  lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle,
 
  .......
    if lvIOData.IO_TYPE = IO_TYPE_Accept then  //连接恳求
    begin
      TIODataMemPool.instance.giveBackIOData(lvIOData);
      PostWSARecv(lvClientContext);
    end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
    begin
      //参加到套接字对应的缓存中,处理惩罚逻辑
      lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
        lvIOData.Overlapped.InternalHigh);
      TIODataMemPool.instance.giveBackIOData(lvIOData);
      //持续送达接管恳求
      PostWSARecv(lvClientContext);
    end;    
  .........
end;
//参加到套接字对应的缓存中,处理惩罚逻辑 
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf, 
  lvIOData.Overlapped.InternalHigh); 
//这里会调用测验测验进行解码
procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
var
  lvObject:TObject;
begin
  FCS.Enter;
  try
    //参加到套接字对应的缓存
    FBuffers.AddBuffer(buf, len);
    //调用注册的<进行解码>
    lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
    if lvObject <> nil then
    try
      try
        //解码成功,调用营业逻辑的处理惩罚办法
        dataReceived(lvObject);
      except
        on E:Exception do
        begin
          TIOCPFileLogger.logErrMessage(截获处理惩罚逻辑异常! + e.Message);
        end;
      end; 
      //清理掉这一次分派的内存<若是没有可用的内存块>清理
      if FBuffers.validCount = 0 then
      begin
        FBuffers.clearBuffer;
      end;
    finally
      lvObject.Free;
    end;
  finally
    FCS.Leave;
  end;
end;
我们在之前的Demo中应用的是TIOCPJSonStreamDecoder
function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
var
  lvJSonLength, lvStreamLength:Integer;
  lvData:String;
  lvBuffer:array of Char;
  lvBufData:PAnsiChar;
  lvStream:TMemoryStream;
  lvJsonStream:TJsonStream;
  lvBytes:TBytes;
  lvValidCount:Integer;
begin
  Result := nil;
  //若是缓存中的数据长度不敷包头长度,解码失败<json字符串长度,流长度>
  lvValidCount := inBuf.validCount;
  if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then
  begin
    Exit;
  end;
  //记录读取地位
  inBuf.markReaderIndex;
  inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
  inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));
  lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
  lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
  //若是缓存中的数据不敷json的长度和流长度<申明数据还没有收取完毕>解码失败
  lvValidCount := inBuf.validCount;
  if lvValidCount < (lvJSonLength + lvStreamLength) then
  begin
    //返回buf的读取地位
    inBuf.restoreReaderIndex;
    exit;
  end else if (lvJSonLength + lvStreamLength) = 0 then
  begin
    //两个都为0<两个0>客户端可以用来作为主动重连应用
    TIOCPFileLogger.logDebugMessage(接管到一次[00]数据!);
    Exit;
  end;
  //解码成功
  lvJsonStream := TJsonStream.Create;
  Result := lvJsonStream;
  //读取json字符串
  if lvJSonLength > 0 then
  begin
    SetLength(lvBytes, lvJSonLength);
    ZeroMemory(@lvBytes[0], lvJSonLength);
    inBuf.readBuffer(@lvBytes[0], lvJSonLength);
    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
    lvJsonStream.Json := SO(lvData);
  end else
  begin
    TFileLogger.instance.logMessage(接管到一次JSon为空的一次数据恳求!, IOCP_ALERT_);
  end;
  //读取流数据 
  if lvStreamLength > 0 then
  begin
    GetMem(lvBufData, lvStreamLength);
    try
      inBuf.readBuffer(lvBufData, lvStreamLength);
      lvJsonStream.Stream.Size := 0;
      lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);
      //解压流
      if lvJsonStream.Json.B[config.stream.zip] then
      begin
        //解压
        TZipTools.unCompressStreamEX(lvJsonStream.Stream);
      end;
    finally
      FreeMem(lvBufData, lvStreamLength);
    end;
  end;
end;
//办事端中有三行代码来共同客户端的编码流
lvJSonLength := TNetworkTools.ntohl(lvJSonLength); 
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);    
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes); 
/////
办事端首要完成的功能有 
0.断定接管到的数据是否可以进行解码,若是不成以退出,解码不成功。
1.接管json长度,流数据长度
2.接管json数据,接管流数据存入JsonStream.json中,
3.按照json中config.stream.zip进行断定流数据是否须要解压.放入JsonStream.stream中 
4.解码成功返回JsonStream对象。
解码完成后可以看到
lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers); 
if lvObject <> nil then 
try //解码成功,调用营业逻辑的处理惩罚办法 
    dataReceived(lvObject); 
………
解码成功调用dataReceived,进行逻辑的处理惩罚。
总结:
   办事端的配套客户端的编码器,办事端的编码器配套客户端的。
无论对感情还是对生活,“只要甜不要苦”都是任性而孩子气的,因为我们也不完美,我们也会伤害人。正因为我们都不完美,也因为生活从不是事事如意,所以对这些“瑕疵”的收纳才让我们对生活、对他人的爱变得日益真实而具体。—— 汪冰《世界再亏欠你,也要敢于拥抱幸福》
                     
                  
     
  
 
    
    
>>>>>>DIOCP评论辩论群:320641073
>>>>>>SVN源码和DEMO:https://code.google.com/p/diocp/ 
收集带宽有限,对数据进行紧缩再进行传送可以有效的哄骗带宽和进步传输的效力。
DIOCP中对数据进行紧缩传送,须要批改编码器和,先说说这两个器材的的用法和功能。
举个例子:我们要把一台电脑快递回老家给正在上学的小弟应用,那么老家就是办事端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。
在这个过程中,发送一个对象(电脑)用到客户端的编码器,接管对象(电脑)用到办事端的。
在之前编写的DIOCP例子中都应用了JSonStream对象进行传输,这个TJSonStream类,首要有两个项目组的数据,第一项目组包含JSon字符串数据,第二项目组包含Stream流数据。在三层数据保存例子中我们把客户端的恳求放在JSon中。办事端接管数据后经由过程办事端的还原成JSonStream对象。然掉队行逻辑的处理惩罚,办事端回写对象时,同过办事端的编码器对回写的JSonStream进行编码发送,客户端经由过程客户端的接管并还原成JSonStream对象。在办事端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,若是对Stream对象进行紧缩,在做紧缩中调试法度时发明一个70K的数据包进行一下紧缩,数据包可以变成7K了,对文本紧缩结果还是很不错的。
当然我们容许本身定义和谈和编写编码和,我们可以定义本身的TStrStream类或者TXMLStream等等,然后编写响应的编码器和就行了。
下面解析下代码
客户端代码:
var
lvJSonStream, lvRecvObject:TJsonStream;
lvStream:TStream;
lvData:AnsiString;
l, j, x:Integer;
begin
lvJSonStream := TJsonStream.Create;
try
lvJSonStream.JSon := SO();
lvJSonStream.JSon.I[cmdIndex] := 1001; //打开一个SQL脚本,获取数据
lvJSonStream.Json.S[sql] := mmoSQL.Lines.Text;
FClientSocket.sendObject(lvJSonStream);
finally
lvJSonStream.Free;
end;
起首是建树一个TJSonStream对象,然后设定信息,因为是产生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,
procedure TD10ClientSocket.sendObject(pvObject:TObject);
begin
if FCoder = nil then raise Exception.Create(没有注册对象编码和(registerCoder)!);
if not Active then Exit;
FCoder.Encode(Self, pvObject);
end;
可以看出这里调用注册的编码器,调用Encode函数
客户端编码器的Encode函数如下
procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
TObject);
var
lvJSonStream:TJsonStream;
lvJSonLength:Integer;
lvStreamLength:Integer;
sData, lvTemp:String;
lvStream:TStream;
lvTempBuf:PAnsiChar;
lvBytes, lvTempBytes:TBytes;
l:Integer;
lvBufBytes:array[0..1023] of byte;
begin
if pvObject = nil then exit;
lvJSonStream := TJsonStream(pvObject);
//是否紧缩流
if (lvJSonStream.Stream <> nil) then
begin
if lvJSonStream.Json.O[config.stream.zip] <> nil then
begin
if lvJSonStream.Json.B[config.stream.zip] then
begin
//紧缩流
TZipTools.compressStreamEx(lvJSonStream.Stream);
end;
end else if lvJSonStream.Stream.Size > 0 then
begin
//紧缩流
TZipTools.compressStreamEx(lvJSonStream.Stream);
lvJSonStream.Json.B[config.stream.zip] := true;
end;
end;
sData := lvJSonStream.JSon.AsJSon(True);
lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData);
lvJSonLength := Length(lvBytes);
lvStream := lvJSonStream.Stream;
lvJSonLength := TNetworkTools.htonl(lvJSonLength);
if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;
if lvStream <> nil then
begin
lvStreamLength := lvStream.Size;
end else
begin
lvStreamLength := 0;
end;
lvStreamLength := TNetworkTools.htonl(lvStreamLength);
if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;
//json bytes
if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;
if lvStream.Size > 0 then
begin
lvStream.Position := 0;
repeat
l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
until (l = 0);
end;
end;
该项目组完成的功能有
1.断定Stream数据是否须要紧缩。
2.发送Json数据长度和Stream数据长度
3.发送Json数据
4.发送Stream数据
申明:
lvJSonLength := TNetworkTools.ntohl(lvJSonLength); 
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
这三行代码须要申明下,是为了兼容java,netty做办事端便利解码,当然我们也可以不进行转换。直接发送也是可以的。只要共同办事端就行了。和谈是本身定义的。
接下来是办事端IOCP队列中会收到接管数据的旌旗灯号。
function TIOCPObject.processIOQueued: Integer;
var
BytesTransferred:Cardinal;
lvResultStatus:BOOL;
lvRet:Integer;
lvIOData:POVERLAPPEDEx;
lvDataObject:TObject;
lvClientContext:TIOCPClientContext;
begin
Result := IOCP_RESULT_OK;
//工作者线程会停止到GetQueuedCompletionStatus函数处,直到接管到数据为止
lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle,
.......
if lvIOData.IO_TYPE = IO_TYPE_Accept then //连接恳求
begin
TIODataMemPool.instance.giveBackIOData(lvIOData);
PostWSARecv(lvClientContext);
end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
begin
//参加到套接字对应的缓存中,处理惩罚逻辑
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
lvIOData.Overlapped.InternalHigh);
TIODataMemPool.instance.giveBackIOData(lvIOData);
//持续送达接管恳求
PostWSARecv(lvClientContext);
end;
.........
end;
//参加到套接字对应的缓存中,处理惩罚逻辑 
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf, 
  lvIOData.Overlapped.InternalHigh); 
//这里会调用测验测验进行解码
procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
var
lvObject:TObject;
begin
FCS.Enter;
try
//参加到套接字对应的缓存
FBuffers.AddBuffer(buf, len);
//调用注册的<进行解码>
lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
if lvObject <> nil then
try
try
//解码成功,调用营业逻辑的处理惩罚办法
dataReceived(lvObject);
except
on E:Exception do
begin
TIOCPFileLogger.logErrMessage(截获处理惩罚逻辑异常! + e.Message);
end;
end;
//清理掉这一次分派的内存<若是没有可用的内存块>清理
if FBuffers.validCount = 0 then
begin
FBuffers.clearBuffer;
end;
finally
lvObject.Free;
end;
finally
FCS.Leave;
end;
end;
我们在之前的Demo中应用的是TIOCPJSonStreamDecoder
function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
var
lvJSonLength, lvStreamLength:Integer;
lvData:String;
lvBuffer:array of Char;
lvBufData:PAnsiChar;
lvStream:TMemoryStream;
lvJsonStream:TJsonStream;
lvBytes:TBytes;
lvValidCount:Integer;
begin
Result := nil;
//若是缓存中的数据长度不敷包头长度,解码失败<json字符串长度,流长度>
lvValidCount := inBuf.validCount;
if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then
begin
Exit;
end;
//记录读取地位
inBuf.markReaderIndex;
inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));
lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
//若是缓存中的数据不敷json的长度和流长度<申明数据还没有收取完毕>解码失败
lvValidCount := inBuf.validCount;
if lvValidCount < (lvJSonLength + lvStreamLength) then
begin
//返回buf的读取地位
inBuf.restoreReaderIndex;
exit;
end else if (lvJSonLength + lvStreamLength) = 0 then
begin
//两个都为0<两个0>客户端可以用来作为主动重连应用
TIOCPFileLogger.logDebugMessage(接管到一次[00]数据!);
Exit;
end;
//解码成功
lvJsonStream := TJsonStream.Create;
Result := lvJsonStream;
//读取json字符串
if lvJSonLength > 0 then
begin
SetLength(lvBytes, lvJSonLength);
ZeroMemory(@lvBytes[0], lvJSonLength);
inBuf.readBuffer(@lvBytes[0], lvJSonLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
lvJsonStream.Json := SO(lvData);
end else
begin
TFileLogger.instance.logMessage(接管到一次JSon为空的一次数据恳求!, IOCP_ALERT_);
end;
//读取流数据
if lvStreamLength > 0 then
begin
GetMem(lvBufData, lvStreamLength);
try
inBuf.readBuffer(lvBufData, lvStreamLength);
lvJsonStream.Stream.Size := 0;
lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);
//解压流
if lvJsonStream.Json.B[config.stream.zip] then
begin
//解压
TZipTools.unCompressStreamEX(lvJsonStream.Stream);
end;
finally
FreeMem(lvBufData, lvStreamLength);
end;
end;
end;
//办事端中有三行代码来共同客户端的编码流
lvJSonLength := TNetworkTools.ntohl(lvJSonLength); 
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);    
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes); 
/////
办事端首要完成的功能有
0.断定接管到的数据是否可以进行解码,若是不成以退出,解码不成功。
1.接管json长度,流数据长度
2.接管json数据,接管流数据存入JsonStream.json中,
3.按照json中config.stream.zip进行断定流数据是否须要解压.放入JsonStream.stream中 
4.解码成功返回JsonStream对象。
解码完成后可以看到
lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
if lvObject <> nil then
try //解码成功,调用营业逻辑的处理惩罚办法
dataReceived(lvObject);
………
解码成功调用dataReceived,进行逻辑的处理惩罚。
总结:
办事端的配套客户端的编码器,办事端的编码器配套客户端的。
无论对感情还是对生活,“只要甜不要苦”都是任性而孩子气的,因为我们也不完美,我们也会伤害人。正因为我们都不完美,也因为生活从不是事事如意,所以对这些“瑕疵”的收纳才让我们对生活、对他人的爱变得日益真实而具体。—— 汪冰《世界再亏欠你,也要敢于拥抱幸福》



