28 agosto 2010

Semaphores

Hola como están, hoy estaremos viendo un ejemplo de cómo usar Semaphores en Java, a partir de la versión de 1.5 se hicieron unas extensiones importantes al paquete java.util.concurrent la cual vino acompañada de la clase Semaphore la cual brinda la funcionalidad que necesitaremos.

Para entender su uso lo haremos en base a un ejemplo real, en mi trabajo hace unos días surgió el siguiente issue, tenemos algunas clases singleton que funcionan como Providers, el problema es que a su vez tenemos procesos concurrentes que usan este Provider y cada uno hace un refresh de los datos para tener la ultima versión de los datos. Durante esta semana sucedió que N procesos (Fijaremos N=5) corrieron casi al mismo tiempo por ende cada uno de ellos intento refrescar el singleton todos a la vez como el método que hace el refresh esta sincronizado sucedía que actualizaba los datos para 1 Thread y los otros 4 Thread esperaban, después volvía a refrescar para el siguiente Thread y los restantes esperaban y así sucesivamente, les adjunto una imagen para entender mejor el problema.
Problem Image
Para evitar que los 5 Thread realicen el refresh se decidió ejecutar solo un refresh y encolar los otros 4 esperando que el que está realizando el refresh termine, para esto tuvimos que modificar nuestro singleton y utilizar Semaphores y syncronized por medio de un mutex. Aqui está la clase singleton modificada y ahora explicaremos cada paso.

 
package com.javacuriosities.examples;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class Singleton {

 private static Singleton instance;

 private volatile boolean refreshing = false;

 private Object mutex = new Object();

 private List<Semaphore> pendings = new ArrayList<Semaphore>();

 public synchronized static Singleton getSingleInstance() {
  if (instance == null) {
   instance = new Singleton();
  }
  return instance;
 }

 public void refreshAll(String threadName) {
  if (!checkRefreshingStatus()) {
   refresh(threadName);
  } else {
   Semaphore markPending = addSemaphore(threadName);
   acquireSemaphore(markPending, threadName);
  }
 }

 private void refresh(String threadName) {
  try {
   refreshInfo(threadName);
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   releasePending();
  }
 }

 private void acquireSemaphore(Semaphore markPending, String threadName) {
  try {
   if (markPending != null) {
    System.out.println("Acquire semaphore: " + threadName);
    markPending.acquire();
   } else {
    System.out.println("Avoid semaphore: " + threadName);
   }
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }

 private Semaphore addSemaphore(String threadName) {
  Semaphore markPending = null;
  synchronized (mutex) {
   if (isRefreshing()) {
    markPending = new Semaphore(0, true);
    pendings.add(markPending);
    System.out.println("Add semaphore to: " + threadName);
   }
  }
  return markPending;
 }

 private void releasePending() {
  synchronized (mutex) {
   System.out.println("Release semaphores");
   for (Semaphore semaphore : pendings) {
    semaphore.release();
   }
   pendings.clear();
   refreshing = false;
  }
 }

 private synchronized boolean checkRefreshingStatus() {
  boolean previousState = refreshing;
  if (!refreshing) {
   refreshing = true;
  }
  return previousState;
 }

 private void refreshInfo(String threadName) {
  for (int i = 0; i < 1000; i++) {
   System.out.println("Executing refresh: " + threadName);
  }
 }

 public boolean isRefreshing() {
  return refreshing;
 }
}
Iré explicando todos los cambios que hemos efectuado para conseguir lo que esperábamos a su vez iremos viendo como seria el flujo de ejecución. Nota: La clase ha sido modifica para que sirva para las pruebas y el ejemplo.
1) El método refreshAll dejo de ser synchronized.
2) Apenas comienza este método tenemos un IF el cual llama a un método synchronized donde se pregunta si se está refrescando y se cambia el estado del flag refreshing.
3) El primer Thread que llegue a esta parte entrara por el IF los demás se irán por el ELSE.
4) El Thread 1 comienza a refrescar la información, los otros van por el lado del ELSE y entran en el método addSemaphore como vemos este método usa sincronización por mutex esto es para asegurarnos que no se agregan semaphores a la lista mientras la estamos liberando por otro lado, si la lista no se esta liberando y además aun está corriendo el refresh creamos un semaphore y lo retornamos.
5) Si se pudo crear el semaphore se intentan hacer acquire del mismo, si no se logro crear significa que el refresh termino y no era necesaria la espera. Usamos el método acquire para decirle a un semaphore que se quede esperando hasta que sea liberado esto sería el equivalente a hacer this.wait() y esperar que por otra parte del código se haga un this.notify().
6) Otro tema importante es que si la lógica del refresh falla igual debemos liberar los semaphores pendientes sino estaremos lockeando el proceso., por eso el releasePending se encuentra en un bloque finally.

Notar que los semaphore son creados con 0 permits lo que signica que al hacer acquire quedaran esperando que la cantidad de permits este en 1, esto se logra por medio del método release que incrementa en 1 los permits del semaphore.

Les adjunto un zip con la clase singleton mas una clase de test que dispara 20 Threads. Espero que este post les sea de ayuda para ver algún ejemplo de concurrencia y threading en Java, cualquier cosa que no se entienda del post todas las preguntas son bienvenidas al igual que las criticas.

Este post es en honor a tuky que últimamente está muy curioso sobre temas varios de programación.

Example

Saludos,
Luis

Leer más...