Use the Kaazing Gateway .NET AMQP Client Library

In this procedure, you will learn how to use the signed Kaazing Gateway .NET AMQP Client Library and the supported APIs. The steps in this procedure show you how to set up your development environment and add the najor coding steps.

The Kaazing Microsoft .NET WebSocket API supports the following deployment scenarios:

Note: There is no support for Windows CE.

Before You Begin

This procedure is part of Checklist: How to Build Microsoft .NET Clients.

Note: Learn about supported browsers, operating systems, and platform versions in the Release Notes.

Taking a Look at the Microsoft .NET AMQP Tutorial App

Before you start developing, take a look at the demonstrations built with the .NET versions of the AMQP client library. To see the application in action, perform the following steps:

  1. Clone or download the Kaazing AMQP JavaScript tutorial availble on Github at https://github.com/kaazing/dotnet.client.tutorials.
  2. Navigate to the AMQP app for Windows Desktop at dotnet.client.tutorials/amqp/WindowsDesktop.
  3. Double-click AmqpDemo.sln. The solution opens in Visual Studio.
  4. In Solution Explorer, right-click AmqpDemo and click Build.
  5. To run the app, click the Start arrow. The desktop app appears in a new window.
    Figure:
  6. In the app, click Connect to connect to the publicly available Kaazing WebSocket Gateway and AMQP service at URL wss://demos.kaazing.com/amqp.
  7. Click Publish to publish a message to an exchange on the AMQP broker via the Kaazing WebSocket Gateway.

To Use the Kaazing Gateway .NET AMQP Client Library

To demonstrate the Kaazing WebSocket .NET SDK, let's look at a simple .NET AMQP desktop application that uses the Kaazing WebSocket Gateway and its amqp.proxy service to publish messages to exchanges over WebSocket. This is the same .NET AMQP desktop application that is available on Github as part of the Kaazing .NET tutorials here https://github.com/kaazing/dotnet.client.tutorials/tree/develop/amqp/WindowsDesktop.

  1. Install .NET Framework 4.6.2. To see if you have this version installed, see How to: Determine Which .NET Framework Versions Are Installed.
  2. Install a .NET Integrated Development Environment (IDE). This procedure assumes that you are using Microsoft Visual Studio or the free Visual Studio Community.

    Note: You can develop .NET Framework applications in any of the .NET programming languages. Microsoft Visual C# is used in the code examples in this document.

  3. Download the Kaazing Microsoft .NET SDK NuGet package file (.nupkg) from kaazing.com/download.
  4. Open Visual Studio.
  5. Create a new project. Click File, click New, and then click Project.
  6. Click the Installed navigation heading, expand Templates, expand Visual C#, and click Windows Desktop.
  7. Click Windows Forms Application.
  8. At the top of the dialog, select .NET Framework 4.6.2.
  9. In Name, enter AmqpDemo and click OK. Visual Studio created the new AmqpDemo project.
  10. Install the Kaazing Microsoft .NET SDK.
    1. Click TOOLS, click NuGet Package Manager, and then click Package Manager Settings.
    2. In the navigation, click Package Sources.
    3. Click the plus icon to add a new source.
    4. In Name, enter Kaazing.
    5. In Source, click the browse button, ....
    6. Locate the folder containing the .nupkg file for the Kaazing Microsoft .NET SDK and click Select.
    7. Click OK.
    8. Right-click the EchoDemo project, and click Manage NuGet Packages.
    9. Click Online, click Kaazing, and then click Install.
    10. Click Close.
    11. In your project, expand the References element to see the Kaazing .NET SDK libraries.
  11. Review the common .NET AMQP programming steps.

    Now that you have set up your environment to develop .NET applications using the Gateway's AMQP client library, you can start creating your application. You can either build a single application that both publishes and consumes messages, or create two different applications to handle each action. Refer to the Kaazing.AMQP documentation for the complete list of all the AMQP command and callback functions.

    The common .NET AMQP programming steps are:

    1. Create the AmqpClient object
    2. Create channels
    3. Declare an exchange
    4. Declare a queue
    5. Bind an exchange to a queue
    6. Publish messages
    7. Consume messages
    8. Use transactions
    9. Control message flow
    10. Handle exceptions
    11. Connect to an AMQP broker
  12. Create the AmqpClient object.

    First, create an AmqpClient client object. Before you create the client object, add the following import statements in your application page's .cs file:

    using Kaazing.HTML5;
    using Kaazing.Security;
    using Kaazing.AMQP;

    Add the following member variables and not the AmqpClient variable that will be used later:

    namespace Kaazing.AMQP.Demo
    {
        public partial class AMQPDemoForm : Form
        {
            private const int LOG_LIMIT = 50;
    
            private AmqpClient       client = null;
            private BasicChallengeHandler basicHandler;
            private AmqpChannel      publishChannel = null;
            private AmqpChannel      consumeChannel = null; 
            private AmqpChannel      txnPublishChannel = null;
            private AmqpChannel      txnConsumeChannel = null;
            private string           queueName = "queue" + new Random().Next();
            private string           txnQueueName = "txnqueue" + new Random().Next(); 
            private string           exchangeName = "demo_exchange";
            private string           txnExchangeName = "demo_txn_exchange";
            private string           routingKey = "broadcastkey";
            private bool             terminated = false;
            private Queue<String>    logLines = new Queue<String>();
             
            /// The code for the subsequent steps goes here.
        }
    }
    

    Next, create an instance of the AmqpClient object as shown in the following example.

    client = new AmqpClient();

    Now that you have created an instance of the AmqpClient object, you can use the AMQP protocol commands. To handle open, close and error events, add event handlers, as shown in the following example.

    client.OpenEvent += new AmqpEventHandler(ConnectedHandler);
    client.CloseEvent += new AmqpEventHandler(CloseHandler);
    client.ErrorEvent += new AmqpEventHandler(ConnectionErrorHandler);
    
  13. Connect to an AMQP broker.

    You must connect and log in to an AMQP broker. The client generally manages all of its communication on a single connection to an AMQP broker. You establish a connection to an AMQP broker by passing in the broker address, a user name and password, the AMQP version you want to use, and, optionally, a virtual host name (the name of a collection of exchanges and queues hosted on independent server domains). In the following example, the parameters are passed in when you call the connect() method.

    private void ConnectButton_Click(object sender, EventArgs e)
    {
        ConnectButton.Enabled = false;
        client = new AmqpClient();
        client.ChallengeHandler = basicHandler;
        client.OpenEvent += new AmqpEventHandler(ConnectedHandler);
        client.CloseEvent += new AmqpEventHandler(CloseHandler);
        client.ErrorEvent += new AmqpEventHandler(ConnectionErrorHandler);
        ConnectionStatusValueLabel.Text = "CONNECTING";
    
        Log("\n");
        Log("CONNECTING: " + LocationText.Text );
    
        UpdateTextBoxes(false);
        string virtualHost = VirtualHostText.Text;
        string locText = LocationText.Text;
    
        if ((locText == null) || (locText.Length == 0))
        {
            locText = "wss://demos.kaazing.com/amqp";
        }
    
        try
        {
            client.Connect(locText, virtualHost, "guest", "guest");
        }
        catch (Exception ex)
        {
            UpdateUI(false);                
            Log("Exception: " + ex.Message);
        }
    }
    

    In this example, the parameters that are passed in may be: url: wss://demos.kaazing.com/amqp, virtualHost, username: guest, and password: guest.

    Note: The Gateway supports AMQP version 0-9-1.

  14. Create channels.

    Once a connection to an AMQP broker has been established, the client must create a channel to communicate to the broker. A channel is a bi-directional connection between an AMQP client and an AMQP broker. AMQP is multi-channeled, which means that channels are multiplexed over a single network socket connection. Channels are light-weight and consume little resources, and therefore used in AMQP's exception handling mechanism—channels are closed when an exception occurs. The following example shows how you can create two channels (one for publishing to an exchange and one for consuming from a queue):

    publishChannel = client.OpenChannel();
    consumeChannel = client.OpenChannel();

    Once you have created the channels, you can add event handlers for various channel events as shown in the following example.

    publishChannel.OpenEvent += new AmqpEventHandler(PublishOpenHandler);
    publishChannel.CloseEvent += new AmqpEventHandler(PublishCloseChannelHandler);
    publishChannel.DeclareExchangeEvent += new AmqpEventHandler(DeclareExchangeHandler);
    
    consumeChannel.OpenEvent += new AmqpEventHandler(ConsumeOpenHandler);
    consumeChannel.CloseEvent += new AmqpEventHandler(ConsumeCloseChannelHandler);
    consumeChannel.ConsumeEvent += new AmqpEventHandler(ConsumeHandler);
    consumeChannel.BindQueueEvent += new AmqpEventHandler(BindQueueHandler);
    consumeChannel.DeclareQueueEvent += new  AmqpEventHandler(DeclareQueueHandler);
    consumeChannel.FlowEvent += new AmqpEventHandler(FlowHandler);
    consumeChannel.MessageEvent += new AmqpEventHandler(MessageHandler);

    In this example, each of the event handlers has an associated function that processes the AMQP event. The following is an example of a PublishChannelOpenHandler function:

    /*
     * Publish Channel Handlers
     */
    private void PublishChannelOpenHandler(object sender, AmqpEventArgs e)
    {
       this.BeginInvoke((InvokeDelegate)(() =>
       {
           Log("OPENED: Publish Channel");
    
           publishChannel.DeclareExchange(exchangeName, "fanout", false, false, false, null);
       }));
    }
    
  15. Declare an exchange.

    AMQP messages are published to exchanges. Messages contain a routing key that contains the information about the message's destination. The exchange accepts messages and their routing keys and delivers them to a message queue. You can think of an exchange as an electronic mailman that delivers the messages to a mailbox (the queue) based on the address on the message's envelope (the routing key). Exchanges do not store messages.

    Note: AMQP brokers reserve the use of the System exchange type, and thus should not be used by applications.

    AMQP defines different exchange types. Some of these exchange types (Direct, Fanout, and Topic) must be supported by all AMQP brokers while others (Headers and System) are optional. AMQP brokers can also support custom exchange types. The following are the different types of exchanges:

    • Direct—Messages are sent only to a queue that is bound with a binding key that matches the message's routing key.
    • Fanout—Messages are sent to every queue that is bound to the exchange.
    • Topic—Messages are sent to a queue based on categorical binding keys and wildcards.
    • Headers—Messages are sent to a queue based on their header property values.
    • System—Messages are sent to system services.

    Exchanges can be durable, meaning that the exchange survives broker shut-down and must be deleted manually or non-durable (temporary) meaning that the exchange lasts only until the broker is shut down. Finally, to check if an exchange exists on the AMQP broker (without actually creating it), you can create a passive exchange. The following example shows how you can create a direct exchange on the publish channel:

    private void PublishChannelOpenHandler(object sender, AmqpEventArgs e)
    {
        this.BeginInvoke((InvokeDelegate)(() =>
        {
            Log("OPENED: Publish Channel");
    
            publishChannel.DeclareExchange(exchangeName, "fanout", false, false, false, null);
        }));
    }
    

    Note: In this example, the arguments represent boolean values. Note also that no custom parameters are passed in.

    After the exchange is created successfully, a DeclareExchangeEvent event is raised, which calls the previously registered event handler DeclareExchangeOkHandler.

  16. Declare a queue.

    AMQP messages are consumed from queues. You can think of a queue as a mailbox; messages addressed to a particular address (the routing key) are placed in the mailbox for the consumer to pick up. If multiple consumers are bound to a single queue, only one of the consumers receives the message (the one that picked up the mail).

    To check if a queue exists on the AMQP broker (without creating it), you can create a passive queue. Additionally, queues can be marked exclusive, which means that they are tied to a specific connection. If a queue is marked exclusive, it is deleted when the connection on which it was created is closed.

    Queues can be durable, meaning that the queue survives broker shut-down and must be deleted manually or non-durable (temporary) meaning that the queue lasts only until the broker is shut down. Queues can also be marked auto delete, which means that the queue is automatically deleted when it is no longer in use. The following example shows how you can create a queue on the consume channel:

    consumeChannel.DeclareQueue(queueName, false, false, false, false, false, null)
    

    Note: In this example, the arguments represent boolean values. Note also that no custom parameters are passed in.

    After the queue is created successfully, a DeclareQueueEvent event is raised, which calls the previously registered event handler DeclareQueueOkHandler.

  17. Bind an exchange to a queue.

    Once you have created an exchange and a queue in AMQP, you must bind—or map—one to the other so that messages published to a specific exchange are delivered to a particular queue. You bind a queue to an exchange with a routing key as shown in the following example.

    consumeChannel.BindQueue(queueName, exchangeName, routingKey, false, null)

    After the exchange is bound to the queue successfully, a BindQueueEvent event is raised, which calls the previously registered event handler BindQueueOkHandler.

  18. Publish messages.

    Messages are published to exchanges. The established binding rules (routing keys) determine to which queue a message is delivered. Messages have content that consists of two parts:

    1. Content Header—A set of properties that describes the message
    2. Content Body—A blob of binary data

    Additionally, messages can be marked mandatory to send a notification to the publisher in case a message cannot be delivered to a queue. You can also mark a message immediate so that it is returned to the sender if the message cannot be routed to a queue consumer immediately. The following example shows how the content body of a message is added to a buffer (AMQP uses a binary message format) and published to an exchange using the publish channel:

    private void PublishBasic(string text)
    {
        ByteBuffer buffer = new ByteBuffer();
        buffer.PutString(text, System.Text.Encoding.UTF8);
        buffer.Flip();
        AmqpProperties amqpProperties = new AmqpProperties();
        amqpProperties.MessageId = "abcdxyz1234pqr";
        amqpProperties.CorrelationId = "23456";
        amqpProperties.UserId =UserIdText.Text;
        amqpProperties.ContentType = AmqpProperties.TEXT_PLAIN;
        amqpProperties.DeliveryMode = 1;
        amqpProperties.Priority = 6;
        amqpProperties.Timestamp = DateTime.Now.ToLocalTime();
        AmqpArguments customHeaders = new AmqpArguments();
        customHeaders.AddInteger("KZNG_AMQP_KEY1", 100);
        customHeaders.AddLongString("KZNG_AMQP_KEY2", "Custom Header Value");
        amqpProperties.Headers = customHeaders;
        publishChannel.PublishBasic(buffer, amqpProperties, exchangeName, routingKey, false, false);
        Log(amqpProperties.ToString());
        Log("Published Message Properties:");
        Log("MESSAGE PUBLISHED: " + text);
    }
    

    The AmqpProperties class defines pre-defined properties as per AMQP 0-9-1 spec and provides type-safe getters and setters for those pre-defined properties. The value of AMQP 0-9-1's standard "headers" property is of type AmqpArguments. The Kaazing Gateway AMQP implementation uses AmqpArguments to encode the table. Similarly, the Kaazing Gateway AMQP implementation decodes the table and constructs an instance of AmqpArguments.

    The arguments mandatory and immediate use boolean values. Note also that no custom parameters are passed in.

    The username set with the UserId method must match the user that is currently authenticated with the AMQP broker. If they do not match you will see the following error:
    PRECONDITION_FAILED - user_id property set to '<name>' but authenticated user was '<name>'

  19. Consume messages.

    Once messages are published, they can be consumed from a queue. A variety of options can be applied to messages in a queue. For example, publishers can choose to require acknowledgement (ack) of messages so that messages can be redelivered in the case of a delivery failure. If the queue is set to exclusive, it is scoped to just the current connection and deleted when the connection on which it was established is closed. Additionally, you can use the no local setting to notify the broker not to send messages to the connection on which the messages were published. The following example shows how you can consume messages from a queue on the consume channel:

    consumeChannel.DeclareQueue(queueName, false, false, false, false, false, null)
     .BindQueue(queueName, exchangeName, routingKey, false, null)
     .ConsumeBasic(queueName, routingKey, false, false, false, false, null);
    

    Note: In this example, the arguments noLocal, noAck, noWait, and exclusive represent boolean values.

    After the ConsumeBasic() method is successful, a ConsumeEvent event is raised, which calls the previously registered event handler ConsumeHandler. The AMQP broker can then start delivering messages to the client and these messages raise the MessageEvent event, which calls the previously registered event handler MessageHandler. The following example shows how the MessageHandler function retrieves information from the AmqpEventArgs object.

    private void MessageHandler(object sender, AmqpEventArgs e)
    {
        this.BeginInvoke((InvokeDelegate)(() =>
        {
            ByteBuffer buf = e.Body;
            string message = buf.GetString(System.Text.Encoding.UTF8);
            Log(e.AmqpProperties.ToString());
            Log("Consumed Message Properties:");
            Log("MESSAGE CONSUMED: " + message);
    
            // Explicitly acknowledge the message as we are passing
            // 'false' as the value of the 'noAck' parameter in
            // the consumeChannel.ConsumeBasic() call.
            long dt = Convert.ToInt64(e.Arguments["deliveryTag"]);
            ((AmqpChannel)sender).AckBasic(dt, true);
        }));
    }
    

    Here you can see how the properties are retrieved using the AmqpProperties method. A method from the same AmqpProperties class used to encode the properties of the published message.

    Message Acknowledgement

    The Boolean parameter noAck is optional with the default value of true. If noAck is true, the AMQP broker will not expect any acknowledgement from the client before discarding the message. If noAck is false, then the AMQP broker will expect an acknowledgement before discarding the message. If noAck is specified to be false, then you must explicitly acknowledge the received message using AmqpChannel ackBasic().

    In the AMQP demo code in this procedure, message acknowledgement is being performed because false was passed in for noAck in ConsumeBasic(). If the client acknowledges a message and noAck is true (the default setting), then the AMQP message broker will close the channel.

  20. Use transactions.

    AMQP supports transactional messaging, through server local transactions. In a transaction, the server only publishes a set of messages as one unit when the client commits the transaction. Transactions only apply to message publishing and not to the consumption of the messages.

    Note: Once you commit or rollback a transaction on a channel, a new transaction is started automatically. For this reason you must commit all future messages you want to publish on that channel or create a new, non-transactional channel on which to publish messages.

    The following transaction-related methods can be used to work select (start), commit, and rollback a transaction (shown here inside their event handlers):

    /*
     * Button click handler for Select button!
     */
    private void SelectTx_Click(object sender, EventArgs e)
    {
        Log("TXN SELECT/START");
        txnPublishChannel.SelectTx();
    }
    
    /*
     * Button click handler for Commit transaction button!
     */
    private void CommitTx_Click(object sender, EventArgs e)
    {
        Log("TXN COMMIT");
        txnPublishChannel.CommitTx();
    }
    
    /*
     * Button click handler for Rollback transaction button!
     */
    private void RollbackTx_Click(object sender, EventArgs e)
    {
        Log("TXN ROLLBACK");
        txnPublishChannel.RollbackTx();
    }
    

    After the transaction is successfully selected, committed, or rolled back, the corresponding events (SelectTransactionEvent, CommitTransactionEvent, and RollbackTransactionEvent) are raised. These events call previously registered event handlers. Each of the event handlers has an associated function that processes the event.

    txnPublishChannel.CommitTransactionEvent += CommitOkTxHandler;
    txnPublishChannel.RollbackTransactionEvent += RollbackOkTxHandler;
    txnPublishChannel.SelectTransactionEvent += SelectOkTxHandler;
  21. Control message flow.

    You can use flow control in AMQP to temporarily—or permanently—halt the flow of messages on a channel from a queue to a consumer. If you turn the message flow off, no messages are sent to the consumer. The following example shows how you can turn the flow of messages on a channel off and back on:

    /*
    *  Button click handler for Flow On button!
    */
    private void Flow_On_Click(object sender, EventArgs e)
    {
        consumeChannel.FlowChannel(true);
    }
    
    /*
    *  Button click handler for Flow Off button!
    */
    private void Flow_Off_Click(object sender, EventArgs e)
    {
        consumeChannel.FlowChannel(false);
    }
    

    After the flow on a channel is halted or resumed successfully, a FlowEvent event is raised, which calls the previously registered event handler FlowHandler.

  22. Handle exceptions.

    Channels are light-weight and cheap, and therefore used in AMQP's exception handling mechanism—channels are closed when an exception occurs. When the CloseChannelEvent event is raised, the previously registered PublishChannelCloseHandler event handler calls the associated PublishChannelCloseHandler function that processes the AMQP event. The following example shows how that function can be used to log a message about why the channel was closed:

    private void PublishChannelCloseHandler(object sender, AmqpEventArgs e)
    {
        this.BeginInvoke((InvokeDelegate)(() =>
        {
            Log("CLOSED: PUBLISH CHANNEL");
        }));
    }
    
  23. Configure Kaazing WebSocket Gateway to connect to an AMQP broker. If you are using a local Kaazing WebSocket Gateway, the following is an example of the default configuration element for the amqp.proxy service in the Kaazing WebSocket Gateway, as specified in the configuration file GATEWAY_HOME/conf/gateway-config.xml:

                            <service>
                              <accept>ws://localhost:8001/amqp</accept>
                              <connect>tcp://localhost:5672</connect>
    
                              <type>amqp.proxy</type>
    
                              <!--
                              <authorization-constraint>
                                      <require-role>AUTHORIZED</require-role>
                              </authorization-constraint>
                              -->
    
                              <cross-site-constraint>
                                      <allow-origin>http://localhost:8001</allow-origin>
                              </cross-site-constraint>
                            </service>

    In this case, the service is configured to accept WebSocket AMQP requests from the browser at ws://localhost:8001/amqp and proxy those requests to a locally installed AMQP broker (localhost) at port 5672.

    To configure the Gateway to accept WebSocket requests at another URL or to connect to a different AMQP broker, you can edit GATEWAY_HOME/conf/gateway-config.xml, update the values for the accept elements, change the connect property, and restart the Gateway. For example, the following configuration configures Kaazing Gateway to accept WebSocket AMQP requests at ws://www.example.com:80/amqp and proxy those requests to an AMQP broker (amqp.example.com) on port 5672.

                            <service>
                              <accept>ws://www.example.com:80/amqp</accept>
                              <connect>tcp://amqp.example.com:5672</connect>
    
                              <type>amqp.proxy</type>
                            </service>
  24. Setting up an AMQP broker.

    Note: The Kaazing Gateway AMQP client libraries are compatible with AMQP version 0-9-1. Refer your AMQP broker documentation for information about supported AMQP versions.

    There are a wide variety of AMQP brokers available that implement different AMQP versions. For example, RabbitMQ, Apache Qpid, OpenAMQ, Red Hat Enterprise MRG, ZeroMQ, and Zyre. If you do not have an AMQP broker installed yet, you can use Apache Qpid AMQP broker, which supports AMQP version 0-9-1. To set up the Apache Qpid broker on your system, perform the steps described in Setting Up the Gateway and Clients.

Other Coding Styles

In this procedure, you have used an event programming style. You can also use a continuation-passing programming style. The following example shows how you can declare an exchange using the continuation-passing programming style:

publishChannel.DeclareExchange(exchangeName, "direct", passive, durable, noWait, null, declareExchangeHandlerContinuation, errorHandlerContinuation);

You can also combine the two programming styles.

Notes

Next Step

Secure Your .NET AMQP Client