No time to look at your code. Sorry.
Could you tell me why you need a thread for TWSocket ? It is asynchronous 
anyway.
If you use a thread, be sure to create TWSocket from the execute method and 
to add a message pump in your thread. Also make sure to /not/ call the 
message pump from any of the event generated by TWSocket, directly or 
indirectly.

--
Contribute to the SSL Effort. Visit http://www.overbyte.be/eng/ssl.html
--
[EMAIL PROTECTED]
http://www.overbyte.be


----- Original Message ----- 
From: "Christian Hinske" <[EMAIL PROTECTED]>
To: <TWSocket@elists.org>
Sent: Wednesday, January 10, 2007 10:14 PM
Subject: [twsocket] udp multicast multiple packages -> TWSocket bug?


Hi everyone, I am quite new to this mailing list, but I hope anyone can 
answer my question.
The problem I am experiencing is as follows:
I get MPEG2 TS-data from a realtime hardware encoder who is multicasting 
into the local network. I am using TWSocket to receive the datapackes. As 
some of you might know, MPEG2 Transport Stream (TS) consists of 188 Byte 
including a 4Byte header, which I can check to see, if the received package 
is OK.
Now, if I am working with only one thread, everything is fine. Once I am 
adding a second thread (OnDataAvailable -> RingBuffer; RingBuffer-> 
Thread2 -> File), it seems that OnDataAvailable isn't executed for every 
event and so buffers the data.
The hardware encoder packs 7 MPEG2 packages into one udp-packet, so the 
standard-size package I get is 1316 Byte. Now, once I am applying the second 
thread, packages get multiples of that (2632), but they all share the same 
characteristics: The first header is damaged, 2nd - 7th are fine, 8th - 
whatever size are empty or random data.
I was wondering if maybe TWSocket does not copy the second 1316Byte packet 
into the buffer-area after the first 1316Byte package if OnDataAvailable 
hasn't been called in between, but somewhere into the first bytes, which 
would explain why the first header (with 188 Bytes following is broken).
This is how parts of my debugfile look like

1316 Byte udp package received
header 1 - OKAY, pid 256 -> payload video
header 2 - OKAY, pid 256 -> payload video
header 3 - OKAY, pid 256 -> payload video
header 4 - OKAY, pid 259 -> payload audio
header 5 - OKAY, pid 256 -> payload video
header 6 - OKAY, pid 256 -> payload video
header 7 - OKAY, pid 256 -> payload video

2632 Byte udp package received
header 1  - ERROR, pid 19440
header 2  - OKAY, pid 256
header 3  - OKAY, pid 256
header 4  - OKAY, pid 256
header 5  - OKAY, pid 256
header 6  - OKAY, pid 256
header 7  - OKAY, pid 256
header 8  - ERROR, pid 0
header 9  - ERROR, pid 0
header 10 - ERROR, pid 0
header 11 - ERROR, pid 0
header 12 - ERROR, pid 0
header 13 - ERROR, pid 0
header 14 - ERROR, pid 0

The code I am using is in analogy to the TWSocket multithreading example. 
Code looks like this:

type
  TPos = RECORD
    Buffersize, Blocksize : Int64;
    CurrentWPos, CurrentRPos, StartPos : Int64;
    Diskrepanz : Int64;
    Richtung : Byte;
  END;
  TStreamThread = class(TThread)
    private
      FAddrDatei     : Pointer;
      BlockMem       : Pointer;
      procedure CalculateDiscrepancy;
      procedure WriteMemToFile;
      procedure ResetPosition;
    public
      FDatenspeicher : Pointer;
      FPos           : ^TPos;
      FAddrLink      : Pointer;
      FAusgabe       : TLabel;
      Ffinalizethread: Boolean;
      FExchangeFile  : Boolean;
      procedure SetLabel;
      procedure Execute; override;
  end;
  TChangeThread = class(TThread)
    private
      procedure ExchangeFile;
      procedure PrepareFile;
    public
      Mode     : Byte;
      procedure Execute; override;
  end;
  TRingBuffer = class(TObject)
    private
      Speicher   : Pointer;
      Groesse    : Int64;
      Start      : Pointer;
      Writer     : Int64;
      Reader     : Int64;
      Diskrepanz : Int64;
      Richtung   : Integer;
    public
      fReadyToWrite : TEvent;
      fReadyToRead  : TEvent;
      CS : TCriticalSection;
      constructor Create;
      procedure GetData(pData : Pointer; pSize : Int64);
      procedure PutData(pData : Pointer; pSize : Int64);
  end;
  var
    Datei                 : ARRAY[0..1] OF TFileStream;
    AddrDatei             : Pointer;
    AddrLink              : Pointer;
    DoExchange            : Byte;
    RootName              : String;
    Socket:TWSocket;
    StreamThread          : TStreamThread;
    ChangeThread          : TChangeThread;
    ExchangeThread        : TChangeThread;

  public
    { Public-Deklarationen }

    DataRes        : Pointer;
    Pos            : ^TPos;
    MyVar          : Integer;
    CritSec        : TCriticalSection;
    Wholesize      : Int64;
    CurrentFileSize: Int64;
    WrittenSize    : Int64;
    Testvariable   : Int64;
    ReceivedPacks  : Int64;
    MaxSize        : Int64;
    PrerunSize     : Int64;
    FileCount      : Integer;
    StreamID       : Integer;
    RecordingID    : Integer;
    ErrorPacks     : Int64;
    procedure DataAvailable(Sender : TObject; Error : Word);
    procedure WriteToMem;
  end;
var
  Form2: TForm2;

implementation
{$R *.dfm}
{procedure TForm2.SetLabel(lname: string);
begin
  StatusLA.Caption:=lname;
end;  }
constructor TForm2.TRingBuffer.Create;
begin
  Inherited Create;
  GetMem(Start,Groesse);
  Writer:=0;
  Reader:=0;
  Richtung:=0;
  CS:=TCriticalSection.Create;
end;
procedure TForm2.TRingbuffer.GetData(pData : Pointer; pSize : Int64);
var zwSize1,zwSize2 : Int64;
begin
  IF fReadyToRead.WaitFor(INFINITE) = wrSignaled THEN
  BEGIN
    fReadyToWrite.ResetEvent;
    try
      CS.Enter;
      Diskrepanz:=Writer+Groesse*Richtung - Reader;
      IF (Diskrepanz-pSize) > 0 THEN
      BEGIN
        IF Reader+pSize <= Groesse THEN
        BEGIN
          copyMemory(pData,POINTER(Int64(Speicher)+Reader),pSize);
          Inc(Reader,pSize);
          IF Reader=Groesse THEN
          BEGIN
            Reader:=0;
            Richtung:=0;
          END;
        END
        ELSE BEGIN
          zwSize1:=Groesse-Reader;
          zwSize2:=Reader+pSize-Groesse;
          copyMemory(pData,POINTER(Int64(Speicher)+Reader),zwSize1);
          copyMemory(POINTER(Int64(pData)+zwSize1),Speicher,zwSize2);
          Reader:=zwSize2;
          Richtung:=0;
        END;
      END;
    finally
      CS.Leave;
      fReadyToWrite.SetEvent;
    END;
  END;
end;
procedure TForm2.TRingbuffer.PutData(pData : Pointer; pSize : Int64);
var zwSize1, zwSize2 : Int64;
begin
  if fReadyToWrite.WaitFor(INFINITE)=wrSignaled THEN
  BEGIN
    fReadyToRead.ResetEvent;
    try
      CS.Enter;
      Diskrepanz:=Writer+Groesse*Richtung - Reader;
      IF Diskrepanz < Groesse THEN
      BEGIN
        IF Writer + pSize <= Groesse THEN
        BEGIN
          CopyMemory(POINTER(Int64(Speicher)+Writer),pData,pSize);
          Inc(Writer,pSize);
          IF Writer=Groesse THEN
          BEGIN
            Writer:=0;
            Richtung:=1;
          END;
        END
        ELSE BEGIN
          zwSize1:=Groesse-Writer;
          zwSize2:=Writer+pSize-Groesse;
          CopyMemory(POINTER(Int64(Speicher)+Writer),pData,zwSize1);
          CopyMemory(Speicher,POINTER(INTEGER(pData)+zwsize1),zwSize2);
          Writer:=zwsize2;
          Richtung:=1;
        END;
      END;
    finally
      CS.Leave;
      fReadyToRead.SetEvent;
    end;
  END;
end;
procedure TForm2.TChangeThread.ExchangeFile;
begin
  Form2.StreamThread.FExchangeFile:=true;
end;
procedure TForm2.TChangeThread.PrepareFile;
BEGIN
  Inc(Form2.FileCount,1);
  Form2.Datei[Form2.FileCount MOD 
2]:=TFileStream.Create(Form2.rootname+format(fmstring,[Form2.FileCount]),fmCreate);
END;
procedure TForm2.TStreamThread.SetLabel;
begin
  Form2.StatusLA.Caption:='StreamThread';
end;
procedure TForm2.TChangeThread.Execute;
begin
  FreeOnTerminate:=true;
  if Mode=ctmPrepare then
  BEGIN
    Synchronize(PrepareFile);
  END;
  IF Mode=ctmExchange THEN
  BEGIN
    Synchronize(ExchangeFile);
  END;
end;
procedure TForm2.TStreamThread.CalculateDiscrepancy;
begin
  FPos^.Diskrepanz:=(FPos^.Buffersize * FPos^.Richtung + 
FPos^.CurrentWPos) - FPos^.CurrentRPos;
end;
procedure TForm2.TStreamThread.WriteMemToFile;
begin
 //Code
  IF (FAddrDatei <> nil) THEN
  BEGIN
    TFileStream(FAddrDatei^).Write(BlockMem^,FPos^.Blocksize);
    Inc(FPos^.CurrentRPos,FPos^.BlockSize);
    Inc(Form2.WrittenSize,FPos^.BlockSize);
  END
  ELSE
    Form2.StatusLA.Caption:='Error';
end;
procedure TForm2.TStreamThread.ResetPosition;
begin
 //Code
  FPos^.CurrentRPos:=0;
  FPos^.Richtung:=0;
end;

procedure TForm2.TStreamThread.Execute;
var zwPointer : Pointer; zwSize : Int64;
    doneit : Boolean; dummymem : String;
begin
 GetMem(BlockMem,FPos^.BlockSize);
 fExchangeFile:=false;
 WHILE not terminated DO
  BEGIN
    if FExchangeFile THEN
    BEGIN
      Form2.AddrDatei:[EMAIL PROTECTED] MOD 2];
      FreeAndNil(Form2.Datei[(Form2.FileCount-1) MOD 2]);
      FExchangeFile:=False;
    END;
    FAddrDatei:=POINTER(FAddrLink^);

    IF {(FPos^.Diskrepanz>=FPos^.Blocksize) AND} 
(assigned(TFileStream(FAddrDatei^))) AND (Form2.WrittenSize < 
Form2.Wholesize) THEN
    BEGIN
      IF (FPos^.CurrentRPos + FPos^.BlockSize <= FPos^.Buffersize) THEN
      BEGIN
        
CopyMemory(BlockMem,POINTER(FPos^.StartPos+FPos^.CurrentRPos),FPos^.BlockSize);
        Synchronize(WriteMemToFile);
      END; //if fpos^.currentrpos+fpos^.blocksize <= fpos^.buffersize
      IF (FPos^.CurrentRPos >= FPos^.Buffersize) THEN
      BEGIN
        Synchronize(ResetPosition);
      END; // if fpos^.currentrpos >= fpos^.buffersize
    END; // if Diskrepanz>0
  END; //while not terminated
  dispose(BlockMem);
end;

procedure TForm2.WriteToMem;
begin
//Code
end;
procedure TForm2.DataAvailable(Sender : TObject; Error : Word);
var receive, dest,zw : Pointer; rcvsize : Int64;
    zwsize : Int64;
begin
//  StreamThread.Suspend;
  rcvsize:=Socket.RcvdCount;
  IF rcvsize <> Pos^.Blocksize THEN
    Label5.Caption:=inttostr(rcvsize);
  GetMem(receive,rcvsize);
  Socket.Receive(receive,rcvsize);
  IF (rcvsize > 0) THEN
  BEGIN
    Inc(CurrentFilesize,rcvsize);
    Inc(WholeSize,rcvsize);
    Inc(ReceivedPacks,rcvsize DIV Pos^.Blocksize);
    IF (rcvsize MOD Pos^.Blocksize <> 0) THEN
      Inc(ErrorPacks);
    IF ((Pos^.CurrentWPos+rcvsize) <= Pos^.Buffersize) THEN
    BEGIN
      dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos);
      CritSec.Enter;
        CopyMemory(dest,receive,rcvsize);
      CritSec.Leave;
    END
    ELSE BEGIN
      zwsize:=Pos^.Buffersize - Pos^.CurrentWPos;
      dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos);
      CritSec.Enter;
        CopyMemory(dest,receive,zwsize);
      CritSec.Leave;
      zw:=POINTER(Int64(receive)+zwsize);
      dest:=POINTER(Pos^.StartPos);
      zwsize:=(Pos^.CurrentWPos+rcvsize)-Pos^.Buffersize;
      CritSec.Enter;
        CopyMemory(dest,zw,zwsize);
        Pos^.CurrentWPos:=zwsize;
        Pos^.Richtung:=1;
      CritSec.Leave;
    END;
    IF Pos^.CurrentWPos+rcvsize > Pos^.Buffersize THEN
    BEGIN
      CritSec.Enter;
        Pos^.CurrentWPos:=0;
        Pos^.Richtung:=1;
      CritSec.Leave;
      Label5.Caption:='Error on size:'+inttostr(rcvsize);
    END
    ELSE BEGIN
      CritSec.Enter;
        Inc(Pos^.CurrentWPos,rcvsize);
      CritSec.Leave;
    END;
  END;
//  StreamThread.Resume;
  IF (CurrentFileSize >= (MaxSize-PreRunSize)) AND (DoExchange = 0) THEN
  BEGIN
    DoExchange:=1;
    ChangeThread:=TChangeThread.Create(true);
    ChangeThread.Mode:=ctmPrepare;
    ChangeThread.Resume;
  END;
  IF (CurrentFileSize >= MaxSize) AND (DoExchange=1) THEN
  BEGIN
    DoExchange:=0;
    CurrentFileSize:=0;
    ExChangeThread:=TChangeThread.Create(true);
    ExChangeThread.Mode:=ctmExchange;
    ExChangeThread.Resume;
  END;
  dispose(receive);
  Label1.Caption:=Inttostr(round((Pos^.CurrentWPos/Pos^.Buffersize)*100));
  Label2.Caption:=Inttostr(round((Pos^.CurrentRPos/Pos^.Buffersize)*100));
  Label3.Caption:=Inttostr(Pos^.Diskrepanz);
  Label4.Caption:=Inttostr(CurrentFileSize);
  Label5.Caption:=Inttostr(FileCount+1);
  Label11.Caption:=Inttostr(WholeSize);
  Label12.Caption:=Inttostr(WrittenSize);
  Label17.Caption:=Inttostr(receivedPacks);
  Label19.Caption:=Inttostr(ErrorPacks);
end;
procedure TForm2.Button2Click(Sender: TObject);
begin
  new(Pos);
  Pos^.Buffersize:=StrToInt(BuffED.Text);
  Pos^.BlockSize:=StrToInt(BlockED.Text);
  GetMem(DataRes,Pos^.Buffersize);
  Pos^.CurrentWPos:=0;
  Pos^.CurrentRPos:=0;
  Pos^.StartPos:=Int64(DataRes);
  Pos^.Richtung:=0;
  Pos^.Diskrepanz:=0;
  FileCount:=0;
  MaxSize:=strtoint(MAXED.Text);
  Prerunsize:=500000;
  wholesize:=0;
  CurrentFileSize:=0;
  WrittenSize:=0;
  ReceivedPacks:=0;
  ErrorPacks:=0;
end;
procedure TForm2.Button1Click(Sender: TObject);
var TestPointer : Pointer;
begin
  WholeSize:=0;
  CurrentFileSize:=0;
  MyVar:=0;
  Socket:=TWSocket.Create(self);
  Socket.Addr:='0.0.0.0';
  Socket.MultiCastAddrStr:=IPED.Text;
  Socket.Port:=PortED.Text;
  Socket.Proto:='udp';
  Socket.MultiCast:=true;
  Socket.ReuseAddr:=true;
  Socket.MultiThreaded:=true;
  Socket.OnDataAvailable:=DataAvailable;

  CritSec:=TCriticalSection.Create;
  CreateDir(DIRED.Text);
  
Rootname:=DIRED.Text+'\'+FILED.Text+format(fmstring,[recordingid])+format(fmstring,[streamid]);
  Datei[0]:=TFileStream.Create(Rootname+format(fmstring,[FileCount]),fmCreate);
  AddrDatei:[EMAIL PROTECTED];
  AddrLink:[EMAIL PROTECTED];

  StreamThread:=TStreamThread.Create(true);
  StreamThread.FDatenspeicher:=DataRes;
  StreamThread.FPos:=Pointer(Int64(Pos));
  StreamThread.FAusgabe:=StatusLA;
  StreamThread.FAddrLink:=AddrLink;
  StreamThread.FreeOnTerminate:=true;

  StreamThread.Ffinalizethread:=false;

  Socket.Listen;
  StreamThread.Resume;
end;
end.
-- 
  Christian Hinske
  Schleißheimerstraße 157
  D-80797 München
  Tel.  :(++ 49 89) 36 0 37 445
  Mobil :(++49)176 234 10 146
  e-mail: [EMAIL PROTECTED]

Der GMX SmartSurfer hilft bis zu 70% Ihrer Onlinekosten zu sparen!
Ideal für Modem und ISDN: http://www.gmx.net/de/go/smartsurfer
-- 
To unsubscribe or change your settings for TWSocket mailing list
please goto http://www.elists.org/mailman/listinfo/twsocket
Visit our website at http://www.overbyte.be 

-- 
To unsubscribe or change your settings for TWSocket mailing list
please goto http://www.elists.org/mailman/listinfo/twsocket
Visit our website at http://www.overbyte.be

Reply via email to