A common use case is the routing of messages from one component within a system to another. The standard API for this type of messaging within Java is the Java Messaging Service or JMS – but that can be a bit overkill for small embedded systems or if you are implementing another existing messaging protocol like Jabber. Originating from the retepXMPP project, retepTools 9.2 introduces a core messaging API which can form the basis of many simple messaging systems.
Introduction
The Messaging API is relatively simple comprising of four main classes in the uk.org.retep.util.messaging package:
- MessagingService – a class that forms the entry point into a messaging system for messages
- Message – an interface defining the message that can be handled by the service
- Component – an interface defining a component that can accept messages
- Router – an interface defining a component that can route messages to another Router or component.
The API is totally generic so the above classes have no concept of the inplementation of an address (where messages are sent to) or the route required to send them between Routers – this is left entirely to the implementing code. The only requirement is that the implementing classes obey the contract between hashCode and equals.
The API also provides some concrete implementations of the interfaces. These implementations are themselves generic, but they are also fully concurrent allowing the implemented messaging system to operate in a fully multi-threaded environment.
The rest of this article will show how to implement a simple messaging system where a group of interacting components can pass messages comprising of simple POJO’s between themselves.
Concepts
Addressing
The first concept within the Messaging API is addressing. An address is a POJO that defines the Component that the message should be sent to. It also defines where the message was sent from if a response is required. This POJO can be any Java object. For this example we will be using java.lang.String but for more complex systems you would implement a class specifically for that purpose.
For example, in Jabber an address is known as a JID (Jabber ID) and has the form node@domain/resource (where node and resource are optional). In retepXMPP the address is implemented by a JID class which holds those three components individually allowing the messages to be routed by domain first and then by node and finally by resource.
Component
The second concept is a Component, represented by the uk.org.retep.util.messaging.Component interface. It represents a POJO that can accept a Message for some form of processing.
Route
The third concept is a Route. A route is a POJO used by a Router to determine where to send messages. Usually a Route implementation would hold the part of an address relevant to the specific Router using it. You could think of the global Domain Name System (DNS). For DNS you could have an address of “retep.org”. Then sending a messsage to that address would invove the core router using the Route “org” to send the message to the router handling “org”. That router wouild then send the message to the end Component using the Route “retep”.
Router
The final concept is a Router. A Router, represented by the uk.org.retep.util.messaging.Router interface is a specialised Component that can contain other Components. It deals with forwarding Messages to one or more Components by using Route’s.
A Simple example
To show how simple it is to setup a MessagingService, we’ll create a simple system comprising of three Components that will interact with each other when an event occurs. We’ll call these components A, B and C (yes imaginative naming here).
Address
For this example our address is a java.lang.String.
Message
The first step is to create the message. This will hold an int describing the event.
package example;
import uk.org.retep.util.messaging.message.AbstractMessage;
public class Event
extends AbstractMessage<String>
{
private int event;
public int getEvent()
{
return event;
}
public void setEvent( int event )
{
this.event = event;
}
}
AbstractMessage is a concrete implementation of the Message interface implementing all of it’s methods. The <String> generic defines the type of the address, in this case java.lang.String.
Components
Now for our three components. Here component A will simply send a message to B when it’s setValue() method is called. B will log that event and forward the message to C. C will then log the event itself and reply back to the originator (in this case A) with the negative value. A will then log the returned value.
package example;
import net.jcip.annotations.ThreadSafe;
import uk.org.retep.util.messaging.MessageException;
import uk.org.retep.util.messaging.component.AbstractComponent;
@ThreadSafe
public class A
extends AbstractComponent<String, Event>
{
public void setValue( int value )
{
System.out.printf( “A.setValue( %d )\n”, value );
Event event = new Event();
event.setFrom( “A” );
event.setTo( “B” );
event.setEvent( value );
try
{
send( event );
}
catch( MessageException ex )
{
ex.printStackTrace();
} }
@Override
public void consume( Event message )
throws MessageException
{
System.out.printf( “A received event from %s: %d\n”,
message.getFrom(),
message.getEvent() );
}
}
You’ll notice that the setValue method creates a new Event, sets the from and to addresses, the value of the event and then sends the message via the send() method. Once the Component is attached to the MessagingSystem then that message will be sent on to its destination.
@ThreadSafe
public class B
extends AbstractComponent<String, Event>
{
@Override
public void consume( Event event )
throws MessageException
{
System.out.printf( “B received event from %s: %s\n”,
event.getFrom(),
event.getEvent() );
Event newEvent = new Event();
newEvent.setFrom( event.getFrom() );
newEvent.setTo( “C” );
newEvent.setEvent( event.getEvent() );
send( newEvent );
}
}
@ThreadSafe
public class C
extends AbstractComponent<String, Event>
{
@Override
public void consume( Event event )
throws MessageException
{
System.out.printf( “C received event from %s: %s\n”,
event.getFrom(),
event.getEvent() );
Event newEvent = new Event();
newEvent.setFrom( “C” );
newEvent.setTo( event.getFrom() );
newEvent.setEvent( -event.getEvent() );
send( newEvent );
}
}
Router and Route
Now every messaging service has a core router which accepts every message as it first enters the system. For this example we’ll use the address as the route as it’s a single keyword:
package example;
import net.jcip.annotations.ThreadSafe;
import uk.org.retep.util.messaging.router.AbstractRouter;
@ThreadSafe
public class SimpleRouter
extends AbstractRouter<String, String>
{
@Override
protected String getRoute( String to )
{
return to;
}
}
The Generics here define the types for the Address and the Route respectively. Now as we are using String’s then both are set to String. Also our router implements the getRoute() method which should form the Route for this instance from the to address. In this case it’s just the address, but usually you would have some other logic here.
Deploy and Run
Now all there’s left is to deploy the system and send a message:
package example;
import uk.org.retep.util.messaging.MessagingService;
import uk.org.retep.util.messaging.Router;
public class Main
{
private Main()
{
}
public static void main( String… args )
throws Exception
{
Router<String, String> router = new SimpleRouter();
MessagingService<String, String> messagingService = new MessagingService<String, String>(
router );
A a = new A();
messagingService.addRoute( “A”, a );
messagingService.addRoute( “B”, new B() );
messagingService.addRoute( “C”, new C() );
messagingService.startService();
a.setValue( 42 );
}
}
Here we create the core Router and then the MessagingService using that router. We then create the A component and add it to the service. We then do the same for B and C. Finally we start the service, and then set the value of A to 42. When run we should then see the following as the event propagates through the system:
A.setValue( 42 )
B received event from A: 42
C received event from A: 42
A received event from C: -42
Note: C says it’s received the event from A and not B because the event’s from address is A and not B – this is from the statement newEvent.setTo( event.getFrom() ); in B.
Now with this example it’s pretty simple. We could have started the service before adding the components if we wanted to – components can be added or removed at will.
Also this example is bad in the sense that we are adding all components directly to the MessagingService. This would be correct for components routed by the core route, but say we had a router called D containing a component called D/A. The component D/A would be attached to router D and D would be connected to the messaging service.