package freenet.support; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import freenet.node.PrioRunnable; import freenet.support.io.NativeThread; public class PrioritizedSerialExecutor implements Executor { private static volatile boolean logMINOR; static { Logger.registerLogThresholdCallback(new LogThresholdCallback() { @Override public void shouldUpdate() { logMINOR = Logger.shouldLog(Logger.MINOR, this); } }); } private final LinkedList[] jobs; private final int priority; private final int defaultPriority; private boolean waiting; private final boolean invertOrder; private final Map timeByJobClasses = new HashMap(); private String name; private Executor realExecutor; private boolean running; private static final int NEWJOB_TIMEOUT = 5*60*1000; private final Runner runner = new Runner(); class Runner implements PrioRunnable { Thread current; public int getPriority() { return priority; } public void run() { long lastDumped = System.currentTimeMillis(); synchronized(jobs) { if(current != null) { if(current.isAlive()) { Logger.error(this, "Already running a thread for "+this+" !!", new Exception("error")); return; } } current = Thread.currentThread(); } try { while(true) { Runnable job = null; synchronized(jobs) { job = checkQueue(); if(job == null) { waiting = true; try { //NB: notify only on adding work or this quits early. jobs.wait(NEWJOB_TIMEOUT); } catch (InterruptedException e) { // Ignore } waiting=false; job = checkQueue(); if(job == null) { running=false; return; } } } try { if(logMINOR) Logger.minor(this, "Running job "+job); long start = System.currentTimeMillis(); job.run(); long end = System.currentTimeMillis(); if(logMINOR) { Logger.minor(this, "Job "+job+" took "+(end-start)+"ms"); synchronized(timeByJobClasses) { String name = job.toString(); if(name.indexOf('@') > 0) name = name.substring(0, name.indexOf('@')); Long l = timeByJobClasses.get(name); if(l != null) { l = Long.valueOf(l.longValue() + (end-start)); } else { l = Long.valueOf(end-start); } timeByJobClasses.put(name, l); if(logMINOR) { Logger.minor(this, "Total for class "+name+" : "+l); if(System.currentTimeMillis() > (lastDumped + 60*1000)) { Iterator i = timeByJobClasses.entrySet().iterator(); while(i.hasNext()) { Map.Entry e = (Map.Entry) i.next(); Logger.minor(this, "Class "+e.getKey()+" : total time "+e.getValue()); } lastDumped = System.currentTimeMillis(); } } } } } catch (Throwable t) { Logger.error(this, "Caught "+t, t); Logger.error(this, "While running "+job+" on "+this); } } } finally { synchronized(jobs) { current = null; running = false; } } } private Runnable checkQueue() { if(!invertOrder) { for(int i=0;i=0;i--) { if(!jobs[i].isEmpty()) { if(logMINOR) Logger.minor(this, "Chosen job at priority "+i); return jobs[i].removeFirst(); } } } return null; } }; /** * * @param priority * @param internalPriorityCount * @param defaultPriority * @param invertOrder Set if the priorities are thread priorities. Unset if they are request priorities. D'oh! */ public PrioritizedSerialExecutor(int priority, int internalPriorityCount, int defaultPriority, boolean invertOrder) { jobs = new LinkedList[internalPriorityCount]; for(int i=0;i(); this.priority = priority; this.defaultPriority = defaultPriority; this.invertOrder = invertOrder; } public void start(Executor realExecutor, String name) { this.realExecutor=realExecutor; this.name=name; synchronized (jobs) { boolean empty = true; for(int i=0;i