Hola como están, hoy estaremos viendo un ejemplo de cómo usar Semaphores en Java, a partir de la versión de 1.5 se hicieron unas extensiones importantes al paquete java.util.concurrent la cual vino acompañada de la clase Semaphore la cual brinda la funcionalidad que necesitaremos.
Para entender su uso lo haremos en base a un ejemplo real, en mi trabajo hace unos días surgió el siguiente issue, tenemos algunas clases singleton que funcionan como Providers, el problema es que a su vez tenemos procesos concurrentes que usan este Provider y cada uno hace un refresh de los datos para tener la ultima versión de los datos. Durante esta semana sucedió que N procesos (Fijaremos N=5) corrieron casi al mismo tiempo por ende cada uno de ellos intento refrescar el singleton todos a la vez como el método que hace el refresh esta sincronizado sucedía que actualizaba los datos para 1 Thread y los otros 4 Thread esperaban, después volvía a refrescar para el siguiente Thread y los restantes esperaban y así sucesivamente, les adjunto una imagen para entender mejor el problema.
Para evitar que los 5 Thread realicen el refresh se decidió ejecutar solo un refresh y encolar los otros 4 esperando que el que está realizando el refresh termine, para esto tuvimos que modificar nuestro singleton y utilizar Semaphores y syncronized por medio de un mutex. Aqui está la clase singleton modificada y ahora explicaremos cada paso.
Iré explicando todos los cambios que hemos efectuado para conseguir lo que esperábamos a su vez iremos viendo como seria el flujo de ejecución. Nota: La clase ha sido modifica para que sirva para las pruebas y el ejemplo.
package com.javacuriosities.examples;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class Singleton {
private static Singleton instance;
private volatile boolean refreshing = false;
private Object mutex = new Object();
private List<Semaphore> pendings = new ArrayList<Semaphore>();
public synchronized static Singleton getSingleInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
public void refreshAll(String threadName) {
if (!checkRefreshingStatus()) {
refresh(threadName);
} else {
Semaphore markPending = addSemaphore(threadName);
acquireSemaphore(markPending, threadName);
}
}
private void refresh(String threadName) {
try {
refreshInfo(threadName);
} catch (Exception e) {
e.printStackTrace();
} finally {
releasePending();
}
}
private void acquireSemaphore(Semaphore markPending, String threadName) {
try {
if (markPending != null) {
System.out.println("Acquire semaphore: " + threadName);
markPending.acquire();
} else {
System.out.println("Avoid semaphore: " + threadName);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private Semaphore addSemaphore(String threadName) {
Semaphore markPending = null;
synchronized (mutex) {
if (isRefreshing()) {
markPending = new Semaphore(0, true);
pendings.add(markPending);
System.out.println("Add semaphore to: " + threadName);
}
}
return markPending;
}
private void releasePending() {
synchronized (mutex) {
System.out.println("Release semaphores");
for (Semaphore semaphore : pendings) {
semaphore.release();
}
pendings.clear();
refreshing = false;
}
}
private synchronized boolean checkRefreshingStatus() {
boolean previousState = refreshing;
if (!refreshing) {
refreshing = true;
}
return previousState;
}
private void refreshInfo(String threadName) {
for (int i = 0; i < 1000; i++) {
System.out.println("Executing refresh: " + threadName);
}
}
public boolean isRefreshing() {
return refreshing;
}
}
1) El método refreshAll dejo de ser synchronized.
2) Apenas comienza este método tenemos un IF el cual llama a un método synchronized donde se pregunta si se está refrescando y se cambia el estado del flag refreshing.
3) El primer Thread que llegue a esta parte entrara por el IF los demás se irán por el ELSE.
4) El Thread 1 comienza a refrescar la información, los otros van por el lado del ELSE y entran en el método addSemaphore como vemos este método usa sincronización por mutex esto es para asegurarnos que no se agregan semaphores a la lista mientras la estamos liberando por otro lado, si la lista no se esta liberando y además aun está corriendo el refresh creamos un semaphore y lo retornamos.
5) Si se pudo crear el semaphore se intentan hacer acquire del mismo, si no se logro crear significa que el refresh termino y no era necesaria la espera. Usamos el método acquire para decirle a un semaphore que se quede esperando hasta que sea liberado esto sería el equivalente a hacer this.wait() y esperar que por otra parte del código se haga un this.notify().
6) Otro tema importante es que si la lógica del refresh falla igual debemos liberar los semaphores pendientes sino estaremos lockeando el proceso., por eso el releasePending se encuentra en un bloque finally.
Notar que los semaphore son creados con 0 permits lo que signica que al hacer acquire quedaran esperando que la cantidad de permits este en 1, esto se logra por medio del método release que incrementa en 1 los permits del semaphore.
Les adjunto un zip con la clase singleton mas una clase de test que dispara 20 Threads. Espero que este post les sea de ayuda para ver algún ejemplo de concurrencia y threading en Java, cualquier cosa que no se entienda del post todas las preguntas son bienvenidas al igual que las criticas.
Este post es en honor a tuky que últimamente está muy curioso sobre temas varios de programación.
Example
Saludos,
Luis
No se si he entendido el objetivo que se pretende conseguir con los semáforos, ¿Evitar que hilos que comenzaron la espera antes de la última actualización actualicen de nuevo? Es decir, si he ido por una versión actualizada me vale cualquiera posterior al momento en que decidí ir por ella.
ResponderEliminarHola Franci gracias por tomarte el tiempo de leer el post, básicamente lo que intento lograr con los semáforos es que si disparo 5 procesos no se ejecuten 5 refresh sino uno solo y asi ahorrarme el tiempo de espera, porque en la versión cada hilo tenía que esperar los que estaban antes de él y después de esperar a los N previos hacia su refresh.
ResponderEliminarentonces no veo la necesidad de un semaforo, no sería mucho más sencillo como un numero de versión, algo así
ResponderEliminarpublic class Singleton
{
public static Singleton instance;
private final AtomicInteger version = new AtomicInteger();
private final Object mutex = new Object();
public synchronized static Singleton getSingleInstance()
{
if (instance == null)
{
instance = new Singleton();
}
return instance;
}
public void refresh(String threadName)
{
final int ver = version.get();
synchronized(mutex)
{
if(ver==version.get())
{
System.out.println("Executing refresh: "+threadName);
version.incrementAndGet();
}
}
}
}
o un booleano, algo así
public class Singleton
{
public static Singleton instance;
private volatile boolean update = false;
private final Object mutex = new Object();
public synchronized static Singleton getSingleInstance()
{
if (instance == null)
{
instance = new Singleton();
}
return instance;
}
public void refresh(String threadName)
{
update = true;
synchronized(mutex)
{
if(update)
{
System.out.println("Executing refresh: "+threadName);
update = false;
}
}
}
}
Tenes razón es completamente valido lo de la versión o el booleano, pero la ventaja de usar lo semáforos es que puedo desviar el código y no dejarlo wait en el syncronized del mutex, porque aunque las posibilidades son mínimas podría existir el caso donde el Thread que está ejecutando el refresh incremente la versión y otro Thread llegue y tome el id recien incrementado mientras todavía el otro Thread este dentro del mutex. Por lo tanto ese refresh seria innecesario. Al lograr desviar el código y hacer wait por un flujo alternativo tengo este caso cubierto y si no tuve ningún error los demás casos también. Aunque tu solución del booleano tiene la ventaja que si la lógica del refresh falla puedo dejar la variable update en true así el siguiente intenta hacer el refresh, igual tengo como assumption que si la lógica del refresh llegara a fallar, fallaría para todos igual.
ResponderEliminarLo primero descarta la versión del AtomicInteger creo que nada garantiza que la instrucción fuera de sincronización no se traslade al bloque sincronizado según esto
ResponderEliminarhttp://java.sun.com/docs/books/jls/third_edition/html/memory.html#64669
pero sigo pensando que no es necesario, la versión del boolean garantiza que cualquier hilo que llegue a
update = true;
se conformará con el resultado obtenido por cualquier otro hilo que no hubiese llegado hasta ese momento a
update = false;
el tiempo entre esta instrucción y la salida del bloque sincronizado son estas 3 instrucciónes
46: putfield #2; //Field update:Z
49: aload_2
50: monitorexit
si lo que quieres garantizar que se conforme se conformará con el resultado obtenido por otro hilo que no se hubiese iniciado a obtener hasta ese momento el update = false; se pone al principio del bloque del if
Existe un único problema que un hilo puede refrescar innecesariamente (para el) si algún hilo pone a true el valor entre que se puso a false por última vez y el actual llega al punto donde comprueva el valor, pero aún así el número de veces que se refresca el valor no aumenta.
Hola Franci, estuve leyendo el artículo (Aclaración: dentro de mis conocimientos de ingles), y como vos decís nada garantiza que la instrucción fuera de sincronización no se traslade, por ese mismo motivo despacho el flujo para otro lado (Porque de esta manera si me aseguro que aunque haya cambio de Thread no ejecutar el refresh). Respeto a la versión de booleano es justamente el caso que te planteaba antes, como puedo yo saber que en esas 3 instrucciones no habrá un cambio de Thread. Si yo moviera el update como primera línea después del if tendría aun más posibilidades de que sucedan refreshes innecesarios.
ResponderEliminarEspero haber comprendido bien tu explicación y la del artículo y gracias por tomarte el tiempo de buscar otra solución y brindar la explicación.
Saludos,
Luis