/* This code is part of Freenet. It is distributed under the GNU General * Public License, version 2 (or at your option any later version). See * http://www.gnu.org/ for further details of the GPL. */ package freenet.support; import java.util.Collection; import com.db4o.ObjectContainer; import freenet.client.FetchException; import freenet.client.FetchResult; import freenet.client.HighLevelSimpleClient; import freenet.client.InsertException; import freenet.client.async.BaseClientPutter; import freenet.client.async.ClientCallback; import freenet.client.async.ClientGetter; import freenet.keys.FreenetURI; import freenet.node.Node; import freenet.node.PrioRunnable; import freenet.support.io.TempBucketFactory; /** * A thread which periodically wakes up and iterates to start fetches and/or inserts. * * When calling start(), the thread will iterate the first time after getStartupDelay() milliseconds. * After each iteration, it will sleep for getSleepTime() milliseconds. * * @author xor */ public abstract class TransferThread implements PrioRunnable, ClientCallback { private final String mName; protected final Node mNode; protected final HighLevelSimpleClient mClient; protected final TempBucketFactory mTBF; private Thread mThread; private volatile boolean isRunning = false; private volatile boolean shutdownFinished = false; private final Collection mFetches = createFetchStorage(); private final Collection mInserts = createInsertStorage(); public TransferThread(Node myNode, HighLevelSimpleClient myClient, String myName) { mNode = myNode; mClient = myClient; mTBF = mNode.clientCore.tempBucketFactory; mName = myName; } protected void start() { mNode.executor.execute(this, mName); Logger.debug(this, "Started."); } /** Specify the priority of this thread. Priorities to return can be found in class NativeThread. */ public abstract int getPriority(); public void run() { isRunning = true; mThread = Thread.currentThread(); try { Thread.sleep(getStartupDelay()); } catch (InterruptedException e) { mThread.interrupt(); } try { while(isRunning) { Thread.interrupted(); try { Logger.debug(this, "Loop running..."); iterate(); long sleepTime = getSleepTime(); Logger.debug(this, "Loop finished. Sleeping for " + (sleepTime/(1000*60)) + " minutes."); Thread.sleep(sleepTime); } catch(InterruptedException e) { mThread.interrupt(); } catch(Exception e) { Logger.error(this, "Error in iterate probably()", e); } } } finally { try { abortAllTransfers(); } catch(RuntimeException e) { Logger.error(this, "SHOULD NOT HAPPEN, please report this exception", e); } finally { synchronized (this) { shutdownFinished = true; notify(); } } } } /** * Wakes up the thread so that iterate() is called. */ public void nextIteration() { mThread.interrupt(); } protected void abortAllTransfers() { Logger.debug(this, "Trying to stop all fetches & inserts..."); abortFetches(); abortInserts(); } protected void abortFetches() { Logger.debug(this, "Trying to stop all fetches..."); if(mFetches != null) synchronized(mFetches) { ClientGetter[] fetches = mFetches.toArray(new ClientGetter[mFetches.size()]); int fcounter = 0; for(ClientGetter fetch : fetches) { /* This calls onFailure which removes the fetch from mFetches on the same thread, therefore we need to copy to an array */ fetch.cancel(null, mNode.clientCore.clientContext); ++fcounter; } Logger.debug(this, "Stopped " + fcounter + " current fetches."); } } protected void abortInserts() { Logger.debug(this, "Trying to stop all inserts..."); if(mInserts != null) synchronized(mInserts) { BaseClientPutter[] inserts = mInserts.toArray(new BaseClientPutter[mInserts.size()]); int icounter = 0; for(BaseClientPutter insert : inserts) { /* This calls onFailure which removes the fetch from mFetches on the same thread, therefore we need to copy to an array */ insert.cancel(null, mNode.clientCore.clientContext); ++icounter; } Logger.debug(this, "Stopped " + icounter + " current inserts."); } } protected void addFetch(ClientGetter g) { synchronized(mFetches) { mFetches.add(g); } } protected void removeFetch(ClientGetter g) { synchronized(mFetches) { mFetches.remove(g); } Logger.debug(this, "Removed request for " + g.getURI()); } protected void addInsert(BaseClientPutter p) { synchronized(mInserts) { mInserts.add(p); } } protected void removeInsert(BaseClientPutter p) { synchronized(mInserts) { mInserts.remove(p); } Logger.debug(this, "Removed insert for " + p.getURI()); } protected int fetchCount() { synchronized(mFetches) { return mFetches.size(); } } protected int insertCount() { synchronized(mInserts) { return mInserts.size(); } } public void terminate() { Logger.debug(this, "Terminating..."); isRunning = false; mThread.interrupt(); synchronized(this) { while(!shutdownFinished) { try { wait(); } catch (InterruptedException e) { Thread.interrupted(); } } } Logger.debug(this, "Terminated."); } protected abstract Collection createFetchStorage(); protected abstract Collection createInsertStorage(); protected abstract long getStartupDelay(); protected abstract long getSleepTime(); /** * Called by the TransferThread after getStartupDelay() milliseconds for the first time and then after each getSleepTime() milliseconds. */ protected abstract void iterate(); /* Fetches */ /** * You have to do "finally { removeFetch() }" when using this function. */ public abstract void onSuccess(FetchResult result, ClientGetter state, ObjectContainer container); /** * You have to do "finally { removeFetch() }" when using this function. */ public abstract void onFailure(FetchException e, ClientGetter state, ObjectContainer container); /* Inserts */ /** * You have to do "finally { removeInsert() }" when using this function. */ public abstract void onSuccess(BaseClientPutter state, ObjectContainer container); /** * You have to do "finally { removeInsert() }" when using this function. */ public abstract void onFailure(InsertException e, BaseClientPutter state, ObjectContainer container); public abstract void onFetchable(BaseClientPutter state, ObjectContainer container); public abstract void onGeneratedURI(FreenetURI uri, BaseClientPutter state, ObjectContainer container); /** Called when freenet.async thinks that the request should be serialized to * disk, if it is a persistent request. */ public abstract void onMajorProgress(ObjectContainer container); public boolean objectCanNew(ObjectContainer container) { Logger.error(this, "Not storing TransferThread in database", new Exception("error")); return false; } }