Fluent.Interface


Multi-Threading, Silverlight Sockets & Visualisation Part 2

Having successfully loaded my Visualisation data in Part 1, this article will demonstrate how to send binary encoded data over a socket to a Silverlight application.

Silverlight 2 has supports for asynchronous sockets, which can list to a data-stream connected to servers on a port between 4502 and 4532.  See Dan Wahlin’s good article on setting up a server for sending text messages.

My sample required sending a large amount of data the Model points + triangles, as well as the raw image stream.  This proved to be a challenge with sockets, as there is no well defined beginning or end point of a message, and this volume of data doesn’t can result in an almost constant stream of information.

The solution was to define a magic byte that what identify the start of the ‘message’, which was immediately followed by the size of the Body (as XML meta-data) and the image as binary stream:

Magic(1)

BodySize

ImageSize

Body

Image

Magic(2)

A BinaryWriter was used to send this formatted data to all listening clients:

    private const UInt32 Magic = 0xCAB2FAC;

    private void SendData(string data, byte[] image)
    {
        if (_clientStreams != null)
        {
            lock (_clientStreams)
            {
                List<BinaryWriter> deadWriters = new List<BinaryWriter>();
                // Write the data
                foreach (BinaryWriter writer in _clientStreams)
                {
                    if (writer != null)
                    {
                        try
                        {
                            // Get the body
                            byte[] body = Encoding.UTF8.GetBytes(data);

                            // Get bytes together
                            List<byte> bytes = new List<byte>();
                            bytes.AddRange(BitConverter.GetBytes(Magic));
                            bytes.AddRange(BitConverter.GetBytes((UInt32)body.Length));
                            bytes.AddRange(BitConverter.GetBytes((UInt32)image.Length));
                            bytes.AddRange(body);
                            bytes.AddRange(image);

                            // Write the out
                            writer.Write(bytes.ToArray());
                        }
                        catch (Exception)
                        {
                            // TODO: Trap specific connection exception
                            deadWriters.Add(writer);
                        }
                    }
                }
                // Remove the dead writers
                foreach (BinaryWriter writer in deadWriters)
                {
                    writer.Close();
                    _clientStreams.Remove(writer);
                }
            }
        }
    }

Silver 2 provides an asynchronous socket implementation which once connected can be wired up to receive events when data is transferred:

    void Page_Loaded(object sender, RoutedEventArgs e)
    {
        try
        {
            string host = Application.Current.Host.Source.DnsSafeHost;
            int ip = 4530;

            DnsEndPoint endPoint = new DnsEndPoint(host, ip);
            Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            reader = new VisualisationReader();
            reader.Complete += new EventHandler<VisualisationReadEventArgs>(reader_Complete);

            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.UserToken = socket;
            args.RemoteEndPoint = endPoint;
            args.Completed += new EventHandler<SocketAsyncEventArgs>(OnSocketConnectCompleted);
            socket.ConnectAsync(args);
        }
        catch (Exception ex)
        {
            System.Diagnostics.Debug.WriteLine("Load Exception " + ex);
        }
    }

    private void OnSocketConnectCompleted(object sender, SocketAsyncEventArgs e)
    {
        // Read initial data
        byte[] buffer = new byte[1024];
        e.SetBuffer(buffer, 0, buffer.Length);
        if (e.SocketError == SocketError.Success)
        {
            e.Completed -= new EventHandler<SocketAsyncEventArgs>(OnSocketConnectCompleted);
            e.Completed += new EventHandler<SocketAsyncEventArgs>(OnSocketReceive);
            ReadMoreData(e);
        }
    }

    private void OnSocketReceive(object sender, SocketAsyncEventArgs e)
    {
        try
        {
            if (e.BytesTransferred > 0)
            {
                reader.Read(e);
            }
            // Read More
            ReadMoreData(e);
        }
        catch (Exception ex)
        {
            System.Diagnostics.Debug.WriteLine("Exception " + ex);
        }
    }

    void ReadMoreData(SocketAsyncEventArgs e)
    {
        Socket socket = (Socket)e.UserToken;
        if (!socket.ReceiveAsync(e))
        {
            // Check this for stack overflow
            OnSocketReceive(socket, e);
        }
    }

    void reader_Complete(object sender, VisualisationReadEventArgs e)
    {
        try
        {
            // Get the data set
            TrainingSetData d = GetData(e.Body);
            // Invoke
            Action<TrainingSetData, byte&#91;&#93;> handler = RenderData;
            this.Dispatcher.BeginInvoke(handler, new object[] { d, e.Image });
        }
        catch (Exception ex)
        {
            System.Diagnostics.Debug.WriteLine("Error reading body: " + e.Body);
            System.Diagnostics.Debug.WriteLine(ex);
            throw;
        }
    }

My sample wraps up the logic of interpreting the binary stream into a visualisation reader which calls back to the client once a message has been received.

    /// <summary>
    /// Reads the socket data stream
    /// </summary>
    public class VisualisationReader
    {

        /// <summary>
        /// Token to track the progress of reading the socket data
        /// </summary>
        protected enum Token
        {
            None,
            Header,
            Body,
            Image,
            Complete
        }

        /// <summary>
        /// Header struct to contain the structure of the message
        /// </summary>
        protected struct Header
        {
            public Header(uint body, uint pixels)
            {
                BodyLength = (int)body;
                ImagePixels = (int)pixels;
            }
            public int BodyLength;
            public int ImagePixels;
        }

        public event EventHandler<VisualisationReadEventArgs> Complete;

        public void Read(SocketAsyncEventArgs e)
        {
            e.SetBuffer(buffer, 0, buffer.Length);
            transfered = e.BytesTransferred;

            System.Diagnostics.Debug.WriteLine("Reading... {0}", transfered);

            // Reset the number of bytes read
            read = 0;

            while (read < transfered)
            {
                Token token = NextToken();
                if (token == Token.Complete)
                {
                    OnComplete(new VisualisationReadEventArgs(body.ToString(), image));
                    ResetToken();
                    System.Diagnostics.Debug.WriteLine("Break...");
                    break; // Break out of this read
                }
                else if (token == Token.None)
                {
                    // Exit if we didn't match anything
                    break;
                }
            }

        }

        /// <summary>
        /// Raises the SomeEvent event
        /// </summary>
        protected virtual void OnComplete(VisualisationReadEventArgs e)
        {
            EventHandler<VisualisationReadEventArgs> handler;
            lock (this)
            {
                handler = Complete;
            }
            if (handler != null)
            {
                handler(this, e);
            }
        }

        protected Token NextToken()
        {
            if (currentToken == Token.None)
            {
                // Wait until i get a magic byte at the begining of the buffer
                if (BitConverter.ToUInt32(buffer, 0) == Magic)
                {
                    currentToken = Token.Header;
                    System.Diagnostics.Debug.WriteLine("Read Magic");
                    read = 4;
                }
            }
            else if (currentToken == Token.Header)
            {
                // Build the header for body + image size
                header = new Header(BitConverter.ToUInt32(buffer, 4), BitConverter.ToUInt32(buffer, 8));
                read += 8;

                // Initialize for body
                currentToken = Token.Body;
                readTo = read + header.BodyLength;
                body = new StringBuilder(header.BodyLength);

                System.Diagnostics.Debug.WriteLine("Read Header");
            }
            else
            {
                // ... Implementation Detail for Body and Image ...
            }
        }

        private void ResetToken()
        {
            currentToken = Token.None;
        }

    }

That complete call back contains all the necessary information for rendering the image and overlaying the data points on the sliverlight canvas.  Part 3 will cover this in more detail including an alternative implementation for downloading an image on demand over a RESTful WCF web service.

The source is avaialble for download.

Advertisements

Multi-Threading, Silverlight Sockets & Visualisation Part 1

When looking for ideas to build a socket-aware sample silverlight application, I came across the statistical shape modelling of research of Newcastle professor Tim Coote’s.  He has available on his website some tools for marking up a model with points, and mapping related images.

 

I used Tim’s sample face dataset, and created a Silverlight application that receives a stream of socket information containing the image data and polygons for rendering the ‘mask’

Silverlight Visualisation

Before getting to the visualisation however, I will cover off loading the dataset, and the multi-threading performance tests I did along the way.

The face dataset consists of three directories

·         images

·         points

·         models

A model is defined by a series of images, each of which map to a points file which contain a fixed array of X,Y co-ordinates.  These points map to triangles that form the polygons that are drawn on top of the image for each Timestep in the dataset.

Model contains the training set:

training_set:

{

  107_0764.pts : 107_0764.jpg

  107_0766.pts : 107_0766.jpg

  107_0791.pts : 107_0791.jpg

  107_0792.pts : 107_0792.jpg

}

Points for for each time step contains 68 points at slightly different co-ordinates.

version: 1

n_points: 68

{

249.599 153.934

246.122 205.482

384.186 323.087

394.303 235.871

}

The triangles files indicates which points are joined to form the pologyons.

version: 1

n_triangles: 113

{

 { v1: 0 v2: 21 v3: 27 }

 { v1: 27 v2: 0 v3: 1 }

 { v1: 58 v2: 6 v3: 5 }

 { v1: 61 v2: 58 v3: 60 }

}

My sample contains a Loader responsible for parsing these files as and putting the object model into an in-memory cache.  Given the training data set contains a sequential list of files; loading of this information is inherently parallelisable.

When dealing with multi-threading scenarios, resource locking also becomes an issue, so I constructed a unit test which compares the following thread models:

1.       Single Threaded

2.       Pooled: Multi-Threaded using ThreadPool

3.       Parallel: Multi-Threaded using Microsoft’s new Task Parallel Library (TPL)

With the following locking models

·         Read/Write Lock

·         Write Lock

Following is the sample code for the Pooled method

 
private void RunPooled(Action threadProc)
{
    // Set the stop date
    stop = DateTime.Now.AddSeconds(runFor);

    // Use an event to wait for all work items to complete
    int count = threadCount;
    using (var mre = new ManualResetEvent(false))
    {
        for (int i = 0; i < threadCount; i++)
        {
            ThreadPool.QueueUserWorkItem((state) =>
            {
                // Execute the method
                Thread.CurrentThread.Name = String.Format("Thread#{0:00}", state);
                threadProc();
                if (Interlocked.Decrement(ref count) == 0)
                {
                    mre.Set();
                }
            }, i);
        }

        // Wait for all work to complete
        mre.WaitOne();
    }
}

Following is an example using Parallel.Do method (Note the Parallel.For method seems to perform some internal ‘optimisations’ which results in fewer worker thread’s being spun up).

private void RunParalell(Action threadProc)
{
// Set the stop date
stop = DateTime.Now.AddSeconds(runFor);

// Create the list of actions
List actions = new List();
for (int i = 0; i < threadCount; i++) { actions.Add(threadProc); } // Run the series of actions Parallel.Do(actions.ToArray()); } [/sourcecode]

The test spins up a series of threads to load the dataset which is will randomly read or write to the in-memory cache which is also subject to time outs.  The following configure was used:

Cache Expiry

200ms

Read Lock

20ms

Write Lock

100ms

Thread Sleep

50ms

Read/Write Ratio

9:1 (9 Reads for every 1 Write)

My machine configuration is Del MultiPlex 745, Intel Core 2, 2,394MHz, 4GM RAM running Windows 2003 SP 2.  The average execution time (ms) for each load request is summarised in the chart below:

Silverlight threading performance

Microsoft’s new parallel library wins out on a 2:1 ratio over the single treaded equivalent, and the Read/Write lock proves more efficient as it locks the resource less often given Read’s are more frequent the Writes as configured by the test ratio.

For complete test results, please see the complete spreadsheet or download the sample to play for yourself.

Stay tuned for Part 2 where I will discuss the Silverlight socket implementation, including the complexity of sending multi-part chunks of data for the large images.


.NET open source projects embrace C#3

Every now and then you come across an open source initiative that has taken the new C# lambdas language extensions and run with them.  My two favourite at the moment are AutoFac and Moq.  And yes they are an agile developers favourite toolbox items – a DI container and Mocking framework respectively.

AutoFac enables compile-time configuration of the container, and supports scoping object creation, so that you could have a different instance per transaction, per request etc.  And to top that off it has bench marked very well especially when using the lambdas expressions.

Moq brings the fluent interface of RhinoMocks into the lambdas world, and is equally impressive.

Both libraries have good documentation and are open sourced at google code.


Managed Extensibility Framework / PostSharp

Dependency Injection and ‘Plugin’ frameworks continue to be all the rage.  Following on the back of P&P’s release of the Unity DI library Microsoft has assembled the Application Framework Core team to focus on developing a Managed Extensibility Framework which is to focus amongst other things on a consistent way to perform Dependency Injection.

Already there are a first full of libraries out the including Unity, Castle, SturctureMap, Spring.NET (and don’t forget XAML!). It is becoming increasingly difficult to choose the appropriate one.

PostSharp offers an interesting Aspect Oriented approach to wearing in code at the IL layer post-build; Enabling unobtrusive injection of hooks (such as tracing) by simply declaring .NET attributes and engaging the Laos plugin.

More recently the Entity Framework contrib project has leveraged PostSharp to inject the IPoco interfaces that are (annoyingly) required to support hooking into the ADO.NET extension. Ruurd Boeke has a series of in-depth articles on getting up a running with this framework.

Perhaps NHibernate will still hold it’s own until v2 of the entity framework is released!