Conversations and Transactions in Worker Threads

Posted by on Dec 7, 2011 in Blog, Full Stack Implementation, Software Engineering | 6 comments

Conversations and Transactions in Worker Threads with CDI Events and Interceptors

This article is a continuation in the series that describes how to build a Wicket/CDI/JPA stack to make web application development easier. In this article we are going to tackle worker threads.

When a request comes in for a Wicket page wicket-cdi will take care of wrapping it in a conversation, so the code executing inside of it has access to conversational resources such as the EntityManager. However, when we are executing code in a background thread or even in a Servlet (where nothing is managing the conversation), conversational resources are not available and we would get an error when trying to access EntityManager or any other conversational resource. This is the problem we are going to tackle here.

Setup

First, we need to come up with an excuse to have a worker thread. Lets pretend that we have a requirement for our system to generate system reports at some predetermined time interval. We will have many reports, and it should be easy to add new ones. We can do this by creating a scheduler that will send out a CDI event whenever it is time to generate the reports; the beans responsible for report generation can then use this event to trigger.

A simple event and scheduler can look like this:

public class GenerateSystemReportEvent {
}
 
@ApplicationScoped
public class SystemReportScheduler {
  @Inject Event<GenerateSystemReportEvent> event; // 1
 
  @PostConstruct
  void start() { //2
    Thread timer = new Thread() {
      public void run() {
        try {
          while (true) {
            Thread.sleep(5000); //3
            event.fire(new GenerateSystemReportEvent()); //4
          }
        } catch (InterruptedException e) {
          return;
        }
      }
    };
    timer.setDaemon(true);
    timer.start();
  }
}
  1. The CDI Event object used to fire the event
  2. The method that will kick off the timer thread
  3. Time interval – for testing we set it to just five seconds
  4. Here we fire the event

Now we can easily implement a report bean by subscribing to the GenerateSystemReportEvent event:

public class EmployeesReportGenerator {
  @Inject EmployeesRepository employees;
 
  public void generate(@Observes GenerateSystemReportEvent e) { //1
    System.out.println("The system contains: " //2
	  + employees.count(new EmployeeCriteria()) + " employees");
  }
}
  1. The generate method will be invoked whenever the GenerateSystemReportEvent is fired thanks to the @Observes annotation. Google CDI events if you need more info on how this works.
  2. Here we generate our awesome system report, it will do for our purposes here

Application-Scoped Beans Gotcha

You may think that we are done here, but when we fire up the application nothing happens. This is because SystemReportScheduler#start() is never called. In CDI, beans are lazily instantiated – even the application-scoped ones. Since SystemReportScheduler is never instantiated its post-construct start() method is never called.

What we need here is an application startup event. This way, instead of making the start() method be a post-construct callback, we can subscribe it to the startup event instead. Application lifecycle events like this are coming as part of CDI 1.1, supposedly. But, until that is released we are going to have to make our own. As it turns out, doing so is trivial if we use a ServletContextListener:

public class ApplicationStartedEvent { //1
}
 
public class ApplicationLifecycleNotifier implements ServletContextListener {
  @Inject Event<ApplicationStartedEvent> started; //2
 
  public void contextInitialized(ServletContextEvent sce) {
    BeanManager manager = (BeanManager) sce.getServletContext()
	  .getAttribute(Listener.BEAN_MANAGER_ATTRIBUTE_NAME); //3
    NonContextual.of(getClass(), manager).inject(this); //4
    started.fire(new ApplicationStartedEvent()); //5
  }
 
  public void contextDestroyed(ServletContextEvent sce) {
  }
}
  1. The event bean
  2. The Event object that will be used to fire the event
  3. We need to retrieve the BeanManager so we can inject the context listener. Note that if you are running inside a J2EE container you may retrieve the BeanManager from JNDI
  4. Here we inject the instance using the NonContextual utility provided by wicket-cdi module
  5. Fire the event

And once its complete we have to register it in web.xml:

<web-app>
  ...
  <listener>
    <listener-class>net.ftlines.blog.cdidemo.cdi.ApplicationLifecycleNotifier</listener-class>
  </listener>
  ...
</web-app>

Now that we have a hook for system start up we can change scheduler’s start() method to use it:

void start(@Observes ApplicationStartedEvent ase) {...}

Managing Conversation Scope

With the application lifecycle listener in place when we start our app SystemReportScheduler will come to life and five highly anticipated seconds later we will see this in the console:

Exception in thread "Thread-2" 
org.jboss.weld.context.ContextNotActiveException: 
WELD-001303 No active contexts for scope type javax.enterprise.context.ConversationScoped

Weld is telling us that there is no active conversation context. Why does this matter? Because EmployeesReportGenerator#generate() calls EmployeesRepository#count() which in turn calls EntityManager#createQuery and that last call causes the error because EntityManager is conversational but the conversation scope is not active.

To fix the problem we need to wrap the call to EmployeesReportGenerator#generate() with something that will start a conversation before the method and end it after. There are plenty of keywords in the previous sentence to hint that the solution will look like a method interceptor of some sort, here is the first cut:

@Target({ METHOD, TYPE }) @Retention(RUNTIME) @Documented @InterceptorBinding
public @interface Conversational { //1
}
 
@Conversational
@Interceptor
public class ConversationalInterceptor {
  @Inject @Bound
  BoundConversationContext boundContext;
 
  @AroundInvoke
  public Object wrapInConversation(InvocationContext invocation) throws Exception {
    BoundRequest storage = null; //2
 
    if (!boundContext.isActive()) { //3
      Map<String, Object> session = new HashMap<String, Object>();
      Map<String, Object> request = new HashMap<String, Object>();
      storage = new MutableBoundRequest(request, session); //4
      boundContext.associate(storage);
      boundContext.activate();
    }
 
    try {
      return invocation.proceed(); //5
    } finally {
      if (storage != null) {
        boundContext.deactivate(); //6
        boundContext.dissociate(storage);
      }
    }
  }
}
  1. Annotation that will trigger the interceptor – these kinds of annotations are called Interceptor Binding Annotations in CDI
  2. The storage object where Weld will store the contents of our conversation. We also use this to know if this method invocation was the one that started the conversation, so we can clean up properly and also to share the conversation between nested @Conversational method invocations
  3. Check to see if conversation context already exists, if it does we do not start another one
  4. Create the storage for the conversation and activate the context
  5. Invoke the method we are intercepting
  6. If we were responsible for starting the conversation context, shut it down
Unfortunately we have to use Weld-specific code to manage the conversation scope. Supposedly CDI 1.1 will contain a standard API for doing this, so we will have to wait and see. Until then, however, our application is coupled to Weld. In practice, this is not really a problem because typically applications do not migrate between CDI containers often.

wicket-cdi uses a seam-conversation module which attempts to standardize conversation scope management between various containers, but so far it does not work with non-http conversations so we cannot use it here.

In CDI all interceptors are disabled by default, so we enable it by adding it to beans.xml:

<beans>
<interceptors>
 <class>net.ftlines.blog.cdidemo.cdi.ConversationalInterceptor</class>
</interceptors>
</beans>

The last thing we have to do is to tell CDI to apply our interceptor to the EmployeesReportGenerator#generate() method. This is done by annotating it with the interceptor’s binding annotation we created earlier:

@Conversational
public void generate(@Observes GenerateSystemReportEvent e)

Now when we start our app we will see the following on the console every five seconds:

The system contains: 30 employees

Our worker thread code is now running inside a conversation!

Playing Nice with Http Conversations

If we try to call a @Conversational method from any Wicket code we will get the following error:

Caused by: org.jboss.weld.exceptions.IllegalStateException:
WELD-001304 More than one context active for scope type 
javax.enterprise.context.ConversationScoped

Looks like our conversational interceptor is starting a conversation context even though there is one already started, probably by the wicket-cdi module which wraps Wicket requests in conversations. We have to make our conversational interceptor aware of the http conversation which is kept in the unbound conversation context. To read what the unbound and bound contexts are and about contexts in general see this Weld documentation page.

To fix the problem we have to check both conversation contexts before starting activating a new one:

@Conversational
@Interceptor
public class ConversationalInterceptor {
 
  @Inject @Http
  ConversationContext context; //1
 
  @Inject @Bound
  BoundConversationContext boundContext; //2
 
  @AroundInvoke
  public Object wrapInConversation(InvocationContext invocation) throws Exception {
 
    BoundRequest storage = null;
 
    if (!context.isActive()&&!boundContext.isActive()) { //3
      Map<String, Object> session = new HashMap<String, Object>();
      Map<String, Object> request = new HashMap<String, Object>();
      storage = new MutableBoundRequest(request, session);
      boundContext.associate(storage);
      boundContext.activate();
    }
 
    try {
      return invocation.proceed();
    } finally {
      if (storage != null) {
        boundContext.deactivate();
        boundContext.dissociate(storage);
      }
    }
  }
}
  1. Inject the unbound http conversation context
  2. Inject the bound conversation context
  3. Check to see if either one is active before activating a new one

Now we can annotate our methods as @Conversational without worrying where they are going to be called from.

Throwing in Declarative Transaction Management

It is a frequent need when working with worker threads to have some code run inside a transaction. For example if we wanted our EmployeesReportGenerator#generate() to run inside a transaction so it would use consistent data we would have to write it like this:

@Conversational
public void generate(@Observes GenerateSystemReportEvent e) {
  em.getTransaction().begin();
  try {
    System.out.println("The system contains: " + employees.count(new EmployeeCriteria()) + " employees");
  } catch (RuntimeException e) {
    em.getTransaction().setRollbackOnly();
  } finally {
    if (em.getRollbackOnly()) {
	  em.getTransaction().rollback();
	} else {
	  em.getTransaction().commit();
	}
  }
}

That is a lot of extra code. And its the kind of code that we need to repeat over and over again. Lets wrap it in an interceptor:

@Target({ METHOD, TYPE }) @Retention(RUNTIME) @Documented
@InterceptorBinding
public @interface Transactional { //1
}
 
@Transactional
@Interceptor
public class TransactionalInterceptor {
  @Inject EntityManager em;
 
  @AroundInvoke
  public Object wrapInTransaction(InvocationContext invocation) throws Exception {
    boolean owner = !em.getTransaction().isActive(); //2
 
    if (owner) { //3
      em.getTransaction().begin();
    }
 
    try {
      return invocation.proceed();
    } catch (RuntimeException e) {
      em.getTransaction().setRollbackOnly(); //4
      throw e;
    } finally {
      if (owner) { //5
        if (em.getTransaction().getRollbackOnly()) {
          em.getTransaction().rollback();
        } else {
          em.getTransaction().commit();
        }
      }
    }
  }
}
  1. The binding annotation for the interceptor
  2. Determine wither or not this method invocation will be responsible for starting and ending the transaction
  3. Begin the transaction if there is not one yet
  4. If there is an exception mark the transaction for rollback
  5. If we started the transaction clean up

And register the interceptor in beans.xml:

<beans>
<interceptors>
 <class>net.ftlines.blog.cdidemo.cdi.ConversationalInterceptor</class>
 <class>net.ftlines.blog.cdidemo.cdi.TransactionalInterceptor</class>
</interceptors>
</beans>

Now, to wrap the generate() method in a transaction we have to simply add another annotation to it:

@Conversational @Transactional
public void generate(@Observes GenerateSystemReportEvent e) {...}

But, having to add two annotations is rather annoying, we can fix it by marking the @Transactional annotation as @Conversational so that both interceptors will be applied whenever we use @Transactional:

@Target({ METHOD, TYPE }) @Retention(RUNTIME) @Documented
@InterceptorBinding
@Conversational //1
public @interface Transactional {
}
  1. The added @Conversational annotation

Now we can simply write:

@Transactional
public void generate(@Observes GenerateSystemReportEvent e) {...}

And have the code run inside a transaction that is inside a conversation.

Conclusion

With just a little bit of code we have added declarative conversation and transaction management. If we look at what we have accomplished in this series of articles up to now we can see the makings of a nice stack which includes persistence, inversion of control, web interface, and worker threads – everything you need to build a fully functional web application.

It is also somewhat apparent that CDI 1.0 is not a perfect specification. It still lacks some refinement that will probably get here in 1.1, but even in its current state it is more than a capable platform.

The code is available here.

6 Comments

  1. Is there a way to make Wicket components (and associated contexts) available to worker threads?

    Consider the use-case of a thread that renders an email using a Wicket page as a template with something like:

    Page page = new HomePage(new PageParameters());
    BufferedWebResponse bufferedWebResponse = new BufferedWebResponse(null);
    page.getRequestCycle().setResponse(bufferedWebResponse);
    page.render();
    String emailToSend = bufferedWebResponse.getText().toString();

    This won’t work because there is no Wicket Application associated to the current ThreadContext. Is there a clean way to set up a ThreadContext for worker threads?

  2. Hi Igor,

    I’m aware of WicketTester and have already tried that approach. The problem with WicketTester is that it sets
    up a new WebApplication which does not have the injection plumbing set up by CDI. I’ll keep digging, and if I come up with a solution I’ll post it here.

    Thanks again for releasing this package and the accompanying blog posts- thanks to you I’m about 90% of the way through migrating an old JSF+Seam app to Wicket+CDI bliss. :-)

    • A quick and dirty way is to store the Application instance in a static var in its init() method. Then you can get to it from any thread, and pass it to WicketTester…

  3. Hi Igor,

    In de TransactionalInterceptor, shouldn’t it be

    boolean owner = !em.getTransaction().isActive();

    ?

    • thanks, fixed!

Submit a Comment