Tuesday, September 28, 2010

ActiveMQ Message Groups

The JMS specification does not provide a lot of guidance about use of Message Groups.

Property Name Type Set By Use
JMSXGroupID String Client The identity of the message group this message is part of
JMSXGroupSeq int Client The sequence number of this message within the group; the first message is 1, the second 2,…
JMSXGroupID and JMSXGroupSeq are standard properties clients should use if they want to group messages. All providers must support them.
and that's about it… clear, right???

This post is about how to use Message Groups within ActiveMQ. The documentation on using Message Groups within ActiveMQ has gotten much better - http://activemq.apache.org/message-groups.html, but there are still some nuances that users should be aware of.

First of, why should you care about Message Groups…

Message Groups are used to ensure that one and only one message consumer is processing messages, in order, from a queue (point to point). The message producer controls which messages are in what group, if any, by setting the JMSXGroupID message property.

When message order matters, its much easier to ensure correct processing if only one message consumer is receiving the messages. The problem is that this does not scale; if you have more messages than a single consumer can process in a timely fashion, you need to add additional consumers. When you have multiple consumers receiving messages, you lose the easy ability to process messages in order; messages will be dispatched to available consumers based on many different criteria: round robin, least load, and just the fastest consumer gets more messages. So using Message Groups allows you to specify that for all the messages in that group should be processed by one and only one consumer, while all the other messages (not in groups or in different groups) can be load balanced to other consumers.

The classic example for Message Groups is a stock feed. For any given stock (IBM, MSFT, ORCL), a queue might receive messages for quotes, buy orders, sell orders, bids, etc. If you're trading a given stock, it would be important that your trading application see all of the messages for that stock in order so that you can make the best trade (not get out bid, not pay too much, buy when some one sells cheap, etc.). The challenge is that there are a lot of stocks, and a lot of messages per stock so one message consumer can not possible handle the load. Enter Message Groups. So if each message is put into a message group by stock ticker (e.g. Message Producer sets "JMSXGroupID = APPL" in the message's properties), then ActiveMQ will ensure that only one consumer will get all the messages for that group in order. Pretty cool…

So how does this work for ActiveMQ…

Message Groups are controlled by the Message Producer, and are as easy as simple setting a message property

Message message = session.createTextMessage("hey");
  message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
  producer.send(message);

That's it. Now only one message consumer will get all messages in that group (i.e. JMSXGroupID = IBM_NASDAQ_20/4/05)

Other things you can do with Message Groups…

The other producer controlled feature of Message Groups is setting the Message Group sequence id - JMSXGroupSeq. ActiveMQ requires that valid sequence numbers must be greater than 0 (i.e. start with 1). ActiveMQ takes no special action based on the sequence number, and does not enforce that they increase, are unique, etc. Sequence numbers are there as a convenience for you the developer, and it is your responsibility to make them meaningful to your application.

The only time ActiveMQ cares about sequence numbers is when they are negative (less than 0). If you set JMSXGroupSeq = −1 then ActiveMQ will close the Message Group. What does it mean for the group to be closed? Closing a Message Group means that any future messages in that Message Group could be dispatched to a different consumer. That is, its as though this was the first use of that Message Group, and ActiveMQ will assign the now re-opened (message in group after JMSXGroupSeq=-1 was sent) to one and only one consumer.

So when could a Message Group get assigned to a different consumer…

There are only two instances where a Message Group would get re-assigned to a different consumer:
  • Message Group is explicitly closed (JMSXGroupSeq = −1)
  • the original consumer goes away (consumer.close(), loss of network connectivity, consumer process dies, …)

In either of these two scenarios, ActiveMQ will reassign all future messages in that Message Group to a new consumer which will exclusively get all future messages… even if the original consumer comes back on line…

Note: there is no way to force ActiveMQ to use a specific consumer for a specific Message Group (short of there only being one consumer for that queue). So if a Message Group consumer, for example, loses network connectivity temporarily, then ActiveMQ will re-assign future messages in that group to one of the surviving consumers.

When ActiveMQ assigns (or reassigns) a Message Group, the first message in that group delivered to a consumer will have a special Boolean header set - JMSXGroupFirstForConsumer = true. This is a special flag to let a consumer know that this is the first message of a group that has been assigned to that consumer. It can allow the consumer to flush internal caches or do initial setup work that might be need to process the messages in that group.

Tips and Tricks…

Since ActiveMQ will assign Message Groups to consumers based on which consumers are available, it can be helpful to tell ActiveMQ to wait for all the consumers to come on line before ActiveMQ starts dispatching messages, and making Message Group assignments (even load balancing). To do this, you can set a couple of Destination Policy entries for that queue: consumersBeforeDispathStarts and timeBeforeDispathStarts. Both of these destination properties can into existence as of ActiveMQ 5.3.

  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
      </policyEntries>
    </policyMap>
  </destinationPolicy>

This policy tells ActiveMQ to wait either 2 seconds (2000ms) or until 2 consumers connect. You can set either option, both, or neither per destination, or use ActiveMQ destination wildcards like in the above example which will include all queue.

The other thing to be aware of is managing a large number of Message Groups. By default, ActiveMQ uses a Hash map of Message Groups (MessageGroupHashBucketFactory) that is limited to less than 1024 unique Message Group names. If you plan on using more than 1024 Message Group IDs (JMSXGroupID), than you need to configure ActiveMQ to use a different implementation to manage the larger number of Message Group IDs.

<destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue=">">
          <messageGroupMapFactory>
            <simpleMessageGroupMapFactory/>
          </messageGroupMapFactory>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>

Setting the above Destination Policy will allow ActiveMQ to manage a larger set of group ids at the expense of a little more memory consumption. If you do not make this configuration (i.e. use the default), then with a large number of ids (more than 1024), you will start to get hash map conflicts which can result in ActiveMQ reassigning groups to new consumers unexpectedly, which is not why you were using Message Groups in the first place.

So that's what I've got on ActiveMQ Message Groups. I hope you find them as useful as I have.