Java Server Sent Events Chat Example (SSE)

After writing article about Introduction of Server Sent Events with real life PHP SSE example I decided to write another article this time about using Server Sent Events in Java and Tomcat. Before to start I strongly recommend you to read first article and look into PHP code, because this time I will not describe how Server Sent Events (SSE) works. In short idea is to write simple Java chat application. So Java will be used in a back-end with just one Servlet that runs on Apache Tomcat or any other Servlet compatible container. Front-end will be using JavaScript and just few lines of HTML and CSS.

As you know or may not know – SSE is one way communication from Server to Browser. Clients subscribes to URL to listen to. And when there is something to send server send it. But here comes first problem. Web Chat is a two way communication. Users types something that is send to other users. For out simple purposes this could be achieved using AJAX calls. But let’s start with example step by step and you will get the idea.

If you do not want to read article but prefer to see demo and download and study source code – you can skip reading article and do it now – Try Demo or Download Source Code.

Java Chat Message Class

First of all we need to declare simple Java message class. For our test purposes it will have ID and message text attributes. You can extend it by adding time-stamp and sender ID/name or sender IP address or any other useful information that could be stored in DB and could be useful for users of application.

package com.howopensource.demo.chat;

public class Message {

    private long id;
    private String message;

    public Message(long id, String message) {
        this.id = id;
        this.message = message;
    }

    public long getId() {
        return id;
    }

    public String getMessage() {
        return message;
    }
}

Chat Servlet

Next step is to write Java Servlet that will handle requests and send responses to clients. This Servlet will need of few properties. First attribute counter is used to give unique ID’s to Messages. But in real life this will be done in Database. we need to start a Thread inside of Servlet so property running is about it. Also we need to store all open connections from browsers in order when new message arrives to send it to all of them. Also we need to store somewhere messages when they arrive. And at the end we need to simple message store to store messages. Note that in real life this could be a Database. But for our demo purposes this is simple in memory storage. And here is the code.

package com.howopensource.demo.chat;

@SuppressWarnings("serial")
@WebServlet("/chat")
public class ChatServlet extends HttpServlet {

    private AtomicLong counter = new AtomicLong();
    private boolean running;

    // Keeps all open connections from browsers
    private Map asyncContexts = new ConcurrentHashMap();

    // Temporary store for messages when arrived
    private BlockingQueue messageQueue = new LinkedBlockingQueue();

    // Keep last messages
    private List messageStore = new CopyOnWriteArrayList();
}

Next step is to write an inner Thread that processes and send messages to clients – all open connection from browsers. Here how it looks like. Note that storing messages in this way is not what you want in real scenario. It is here only for demo purposes.

// Thread that waits for new message and then redistribute it
private Thread notifier = new Thread(new Runnable() {

    @Override
    public void run() {
        while (running) {
            try {
                // Waits until a message arrives
                Message message = messageQueue.take();

                // Put a message in a store
                messageStore.add(message);

                // Keep only last 100 messages
                if (messageStore.size() > 100) {
                    messageStore.remove(0);
                }

                // Sends the message to all the AsyncContext's response
                for (AsyncContext asyncContext : asyncContexts.values()) {
                    try {
                        sendMessage(asyncContext.getResponse().getWriter(), message);
                    } catch (Exception e) {
                        // In case of exception remove context from map
                        asyncContexts.values().remove(asyncContext);
                    }
                }
            } catch (InterruptedException e) {
                // Log exception, etc.
            }
        }
    }
});

Next step is to write doGet method that handles incoming users that want to join the chat. Here is a code. Note that we need to add some Tomcat specific code on line 13 that may be not necessary if you are using other Servlet container. Also first lines check if user arrives for the first time on the page and if it is so then it redirects to given JSP page. This could be done in a different way – with different Servlet or to goes directly to JSP.

The next important thing is to set correct HTTP headers. Then we need to check for last message id. If by some reason connection was dropped then browser reconnects automatically with in given time. Which is by default 3 seconds. But we set it in 1 second by this command in code response.getWriter().println(“retry: 1000\n”). Last part is to start asynchronous context and to override it’s methods – mainly to remove it from the map of open connection when connection is dropped by some reason.

Here the code how looks like.

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

    // This is for loading home page when user comes for the first time
    if (request.getAttribute("index") != null) {
        request.setAttribute("messages", messageStore);
        request.getRequestDispatcher("/chat.jsp").forward(request, response);
        return;
    }

    // Check that it is SSE request
    if ("text/event-stream".equals(request.getHeader("Accept"))) {

        // This a Tomcat specific - makes request asynchronous
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

        // Set header fields
        response.setContentType("text/event-stream");
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Connection", "keep-alive");
        response.setCharacterEncoding("UTF-8");

        // Parse Last-Event-ID header field which should contain last event received
        String lastMsgId = request.getHeader("Last-Event-ID");
        if ((lastMsgId != null) && !lastMsgId.trim().isEmpty()) {
            long lastId = 0;
            try {
                lastId = Long.parseLong(lastMsgId);
            } catch (NumberFormatException e) {
                // Do nothing as we have default value
            }
            if (lastId > 0) {
                // Send all messages that are not send - e.g. with higher id
                for (Message message : messageStore) {
                    if (message.getId() > lastId) {
                        sendMessage(response.getWriter(), message);
                    }
                }
            }
        } else {
            long lastId = 0;
            try {
                lastId = messageStore.get(messageStore.size() - 1).getId();
            } catch (Exception e) {
                // Do nothing as this just gets the last id
            }
            if (lastId > 0) {
                // Send some ping with the last id. Idea is browser to be informed
                // which is the last event id. Also tell the browser if connection
                // fails to reopen it after 1000 milliseconds
                response.getWriter().println("retry: 1000\n");
                Message message = new Message(lastId, "Welcome to chat, type message and press Enter to send it.");
                sendMessage(response.getWriter(), message);
            }
        }

        // Generate some unique identifier used to store context in map
        final String id = UUID.randomUUID().toString();

        // Start asynchronous context and add listeners to remove it in case of errors
        final AsyncContext ac = request.startAsync();
        ac.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                asyncContexts.remove(id);
            }
            @Override
            public void onError(AsyncEvent event) throws IOException {
                asyncContexts.remove(id);
            }
            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                // Do nothing
            }
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                asyncContexts.remove(id);
            }
        });

        // Put context in a map
        asyncContexts.put(id, ac);
    }
}

Next step is to write a doPost method that handles incoming messages. We said that those messages are send via AJAX calls from browsers. And here is a simple method that handles them. First we set character encoding of request. Note that in real scenario this should happen in some filter. But for simplicity we do not have it. Note that this is very simple and basing processing. You may want to collect and store more info – such as time-stamp, IP address of sender, user name if applicable, etc. Also messages may be saved in Database here.

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
    // Sets char encoding - should not be done here, better in filter
    request.setCharacterEncoding("UTF-8");

    // Gets message from request
    String message = request.getParameter("msg");

    // Do some verification and save message into DB, etc.
    if ((message != null) && !message.trim().isEmpty()) {
        try {
            // Save message
            // db.saveMessage(message);

            // Create new simple message
            Message msg = new Message(counter.incrementAndGet(), message.trim());
            // Put message into messageQueue
            messageQueue.put(msg);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
}

And the last is to write a small simple method that sends message to client and to override init and destroy methods.

private void sendMessage(PrintWriter writer, Message message) {
    writer.print("id: ");
    writer.println(message.getId());
    writer.print("data: ");
    writer.println(message.getMessage());
    writer.println();
    writer.flush();
}

@Override
public void destroy() {
    // Stops thread and clears queue and stores
    running = false;
    asyncContexts.clear();
    messageQueue.clear();
    messageStore.clear();
}

@Override
public void init(ServletConfig config) throws ServletException {
    super.init(config);

    // Load previous messages from DB into messageStore
    // messageStore.addAll(db.loadMessages(100));

    // Start thread
    running = true;
    notifier.start();
}

JavaScript code to handle SSE

At the end you need to write some JavaScript code that handles this. Here is the code – it is simple and straightforward implementation that just adds the message into one scrolabble DIV element.

// Check that browser supports EventSource 
if (!!window.EventSource) {
    // Subscribe to url to listen
    var source = new EventSource('/chat');

    // Define what to do when server sent new event
    source.addEventListener("message", function(e) {
        var el = document.getElementById("chat"); 
        el.innerHTML += e.data + "lt;br/&>";
        el.scrollTop += 50;
    }, false);
} else {
    alert("Your browser does not support EventSource!");
}

AJAX call to send message to server

And here is JavaScript AJAX method that sends message to the server. It is called when user clicks button.

function sendMsg(form) {

    if (form.msg.value.trim() == "") {
        alert("Empty message!");
    }
    
    // Init http object
    var http = false;
    if (typeof ActiveXObject != "undefined") {
        try {
            http = new ActiveXObject("Msxml2.XMLHTTP");
        } catch (ex) {
            try {
                http = new ActiveXObject("Microsoft.XMLHTTP");
            } catch (ex2) {
                http = false;
            }
        }
    } else if (window.XMLHttpRequest) {
        try {
            http = new XMLHttpRequest();
        } catch (ex) {
            http = false;
        }
    }

    if (!http) {
        alert("Unable to connect!");
        return;
    }

    // Prepare data
    var parameters = "msg=" + encodeURIComponent(form.msg.value.trim());

    http.onreadystatechange = function () {
        if (http.readyState == 4 && http.status == 200) {
            if (typeof http.responseText != "undefined") {
                var result = http.responseText;
                form.msg.value = "";
            }
        }
    };

    http.open("POST", form.action, true);
    http.setRequestHeader("Content-type", "application/x-www-form-urlencoded");
    http.send(parameters);

    return false;
}

Show me example

And here is the real demo where you can play, download and study code.

Related Posts Plugin for WordPress, Blogger...

  • Ande

    Hi Sir, Very good explanation.I deployed the above demo code in Tomcat 7 and it is running fine. But when i deploy the same in Web logic 11G (10.3.6.0) it is giving “java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.startAsync()Ljavax/servlet/AsyncContext;”. Weblogic 11G server is servlet2 container. But AsyncContext is available in Servlet3. The problem will be solved If i use Weblogic 12c But i need to make it run in weblogic 11. Any inputs on this ?