Galaxy Communicator Tutorial:

Setting Up a Brokered Audio Connection

License / Documentation home / Help and feedback

Up to this point, all the communication between servers has been mediated by the Hub. This arrangement has its advantages: for instance, the Hub provides a central location for flow of control and routing. However, it also has its disadvantages: in order for a message to get from one server to another, it has to be decoded, analyzed, and reconstructed in the Hub. While this process appears to be quite efficient, there are circumstances in which you might prefer to send data directly: for example, in situations involving high bandwidth or streaming, such as audio input or output.

To support this alternative, the Galaxy Communicator infrastructure supports a peer-to-peer connection called a broker. The Hub mediates the establishment of this connection, but all data which flows through the connection goes directly from server to server. In this lesson, we'll learn how to set up a broker connection and how it works.

Once you feel comfortable with this lesson, you can consult the broker reference documentation.


Recognizer, Synthesizer and Audio

Up to this point, we've examined four servers in the toy travel demo: the Parser, Backend and Generator servers, which represent the simplest type of server, and the Dialogue server, which represents the next level of complexity. The first three servers simply send responses to dispatch functions, while the Dialogue server sends new messages to the Hub and occasionally waits for the response. The last three servers take advantage of all these tools, and furthermore interact with each other via broker connections.

In addition, the Audio server also exhibits the complexities of UI programming, which we'll explore in the next lesson.


Brokering in four steps

The setup and operation of brokers can be broken down into four steps. We'll describe and illustrate these steps briefly, and then look at some code. We'll focus on the interaction between the Synthesizer and Audio servers as an example.

Step 1: Establish the broker listener

In the first step, the server which will be the source of the broker data establishes a listener for the broker. This listener, in most cases, is the same listener that's listening for connections from the Hub; the protocol is constructed so that the listener can distinguish between the two types of connections. The server assigns each broker a unique ID, so that the server can listen for multiple brokers simultaneously. The server also assigns each broker a timeout; it will accept multiple connections for the broker, until the timeout is reached.

Step 2: Notify the Hub and client

In the second step, the source server sends a new message to the Hub which contains the broker contact information. This information consists of the host, port, datatype and unique ID associated with this particular broker, and is encapsulated in an object called a broker proxy. The message which contains the proxy is a message like any other, and can contain additional information (such as encoding format and sample rate for audio).

Step 3: Establish the broker connection, transmit data

When the target server receives this information, it establishes a connection to the source server using information in the proxy, and establishes a callback to receive the data. The source server can now send data over the connection, and the callback will be invoked in the target server whenever data is received.

Step 4: Process data, shut down broker listener

Finally, when the source server is done sending data, it notifies the target server that it's done, and the broker connection is terminated. The target server does whatever it wants to do with the data (e.g., send audio to the audio device). When the source server's broker reaches its timeout, it stops accepting connections for that broker; when all the connections for the broker are done, the source server shuts down the broker.
Now that we've seen these steps in the abstract, let's take a look at each one of them in detail.


Brokering details

Source server: broker setup, message dispatch

In the first and second brokering steps, the source server sets up a broker listener and sends a message to the Hub announcing the broker. Here's how that happens.

There's more than one way to do this, but in this particular case, we use the function GalSS_ProxifyObjectType. This function creates the broker listener, and wraps a proxy around it. The first argument of this function is the call environment. The connection this call environment contains, and the server it's associated with, will host the broker and its listener. The second argument is the type of data we'll send across this broker connection; in this case, it's arrays of 16-bit integers. The next argument is how often (or whether) to allow the toplevel Communicator loop to poll this broker to see if there's anything to do; 0 means use the default poll value. The fourth argument is the timeout, in seconds; after this amount of time, the broker will stop accepting new connections.

After the new proxy is created, it's inserted into the frame like any other data, and the frame is sent to the Hub and then freed, just like any other message.

Hub: broker client notification

The Hub portion of the process is the simplest, and you can probably guess what it looks like:
PROGRAM: FromSynthesizer

RULE: :proxy --> Audio.Play
IN: :proxy :encoding_format :sample_rate

The Hub matches the incoming message with the appropriate program, shown here, and checks the rule. The :proxy key is present. So the rule fires, and the relevant data is sent to the Audio.Play dispatch function.

Target server: set up broker client, establish callback

When the Audio server invokes the Audio.Play dispatch function, it creates a broker client, as follows:

Again, there are a few ways to do this, but here we use the function GalSS_Unproxify. This function creates the broker client and establishes the callbacks. It requires a call environment object (in case the callback needs the environment to write new messages to the Hub), the proxy, and three possible callbacks: one to handle the data, one which is called when the data transfer is done, and one to be called if the connection aborts before completing. (There are also a few other arguments we won't discuss.) Notice that we copied the broker proxy before we established the connection; this is because, like all other data in frames, the original proxy will be freed when the dispatch function exits and the incoming message is freed.

At this point, the source and target servers have established a broker connection, and the broker client can begin to consume data.

About broker data

When we discussed frames and objects, we talked about the object types that could appear in frames, and concentrated on frames, strings, integers and lists. We suggested at that point that there were many other datatypes which could appear in frames, including arrays of integers of various sizes. Any type that can appear in a frame can be sent across a broker connection, and vice versa, but we won't be looking at frames, string, integers and lists in our broker example; we'll be looking at arrays of integers.

When data is written to a broker server, it is encoded and cached in an output queue. If any broker clients are currently connected, it is also written to the client; and when a new client connects, the server transmits the contents of the output queue to the new client. In other words, all clients which connect to the broker server are guaranteed to see all the same data in the same order, no matter when they connect. This is one of the reasons that broker servers are set up to expire; if a broker server didn't expire, it would never be possible to free its output queue.

The two ends of the broker connection

On the source server side, the server writes data to the broker, and then indicates that it's done, approximately as follows:
  void *data;
  int num_samples;
  GalSS_BrokerProxy *bp;
  SynthesisRecord *s;
 
  /* ... */
 
  data = PollSynthesis(s, &num_samples);
  while (data) {
    GalSS_ProxyArrayAdd(bp, data, num_samples);
    free(data);
    data = PollSynthesis(s, &num_samples);
  }
 
  if (SynthesisIsDone(s)) {
    GalSS_ProxyDone(bp);
  }
  /* ... */
There are two functions involved in this example. On the target server side, the data callback function is invoked each time a block of data is received (each block corresponds to a single call to GalSS_ProxyArrayAdd). The data callback function looks approximately like this:
static void __AudioOutputCallback(GalSS_Environment *env,
                                  Gal_ObjectType o_type,
                                  Gal_Object elt, void *caller_data)
{
  AudioPkg *p = (AudioPkg *) caller_data;
  int n_samples = 0;
  void *data = Gal_Int16Value(elt, &n_samples);
 
  printf("[Audio data to user (%d samples)]\n", n_samples);
  fflush(stdout);
  PlayAudio(p->device, data, n_samples);
  Gal_FreeObject(elt);
}
Each data callback function has the same signature. The first argument is the environment passed to GalSS_Unproxify. The second argument is the data type of the broker proxy. The third object is a Gal_Object corresponding to the data, and the final argument is caller data which was passed to GalSS_Unproxify, in case you need to refer to your own structures.

So when the target server receives a block of data, it invokes the data callback. The data is allocated as a new Gal_Object before it's passed to the data callback function, so once it's done with the data, it frees it. This happens for each block of data (that is, each GalSS_ProxyArrayAdd call on the source side), until the broker client receives the termination message, at which point it will do whatever it's supposed to do when it's done, and terminate.

An example

Let's watch a broker in action. This is slightly complicated by the fact that the unit tester cannot host broker servers or clients. So we need to use the unit tester, acting as a server, to send a message to the Hub which will cause the Synthesizer server to produce a broker connection. This is actually pretty simple; we'll just send a Synthesizer.Synthesize message to the Hub, and populate its program file with the program we saw here, to handle the new message from the Synthesizer server.
[Brokering exercise 1]

Unix
:

% process_monitor $GC_HOME/tutorial/brokering/synth.config

Windows:

C:\> python %PM_DIR%\process_monitor.py %GC_HOME%\tutorial\brokering\synth.config

Select "Process Control --> Restart all", then select "Send new message" in the unit tester. Select the message named Synthesizer.Synthesize, and press OK. You'll see the result in the Audio pane:
[Audio client pane]

[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (1024 samples)]
[Audio data to user (184 samples)]
[Audio data to user is finalized (14520 samples).]

Each of these printouts corresponds to a single call to the data callback function. The Hub shows how the information was conveyed from the source server to the target server:
[Hub pane]

----------------[  3]----------------------
{c FromSynthesizer
   :proxy "[broker proxy: call ID 129.83.10.107:6113:0, host 129.83.10.107, port 15500, type GAL_INT_16]"
   :sample_rate 8000
   :encoding_format "linear16"
   :session_id "Default"
   :tidx 3 }
--------------------------------------------

Found operation for token 3: Audio.Play
Serving message with token index 3 to provider for Audio (id 2)
---- Serve(Audio@<remote>:-1, token 3 op_name Play (session locked))
Got reply from provider for Audio (id 2) : token 3

----------------[  3]----------------------
{c FromSynthesizer
   :proxy "[broker proxy: call ID 129.83.10.107:6113:0, host 129.83.10.107, port 15500, type GAL_INT_16]"
   :sample_rate 8000
   :encoding_format "linear16"
   :session_id "Default"
   :tidx 3 }
--------------------------------------------

Destroying token 3

The FromSynthesizer message constructed by the Synthesizer server contains the proxy, and the program shown here sends the appropriate message to the Audio server. Finally, the Synthesizer pane shows the evidence of the broker client connecting, the unique ID matching, and the transmission of the special conclusion message:
[Synthesizer pane]

Accepted connection from 129.83.10.107 (socket 6)
Found matching out broker for incoming request 129.83.10.107:6113:0
Destroying connection to 129.83.10.107 (socket 6)
Done sending data from out broker

Select "File --> Quit" to end this exercise.


Summary

In this lesson, you've learned about the concept and the details of broker connections, including: In the next lesson, we'll learn about the final and most complex type of server: one which monitors user interaction.

Next: Creating a UI server


License / Documentation home / Help and feedback
Last updated August 8, 2002