【Android】Executorを使用して独自の非同期処理を実装する(2)
前回はExecutor、って言うか、java.util.concurrentパッケージを使って非同期処理を行う方法、Androidで使うならHandlerも必要だよねって話、でもその辺全部を実装する汎用的なクラスを作るとAsyncTaskになっちゃうよ、と言う話をしました。
今回は実際に作って動かしてみます。
独自の非同期処理を実装する前に
結局のところ、これから作るものはAsyncTaskから汎用性を取っ払って完全に画像読み込み処理専用にしてしまうのが目的です。
なので、AsyncTaskのソースは非常に参考になります。今回は4.0.1でのソースをベースに解説していきます。
ThreadPoolExecutorを作成する
まずは実際に動かすExecutorの具象クラスとなるThreadPoolExecutorを定義していきます。
基本的に継承を行う必要はほとんどないです。と言うのも、必要なものはコンストラクタで大体全部設定できるからです。
- ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
- ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
- ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
- ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
つまるところ、以下の4項目が必須要素です
- int corePoolSize - プール内に保持するスレッドの数 (アイドル状態のスレッドも含む)
- int maximumPoolSize - プール内で可能なスレッドの最大数
- long keepAliveTime - スレッドの数がコアよりも多い場合、これは超過したアイドル状態のスレッドが終了前に新しいタスクを待機する最大時間
- BlockingQueue<Runnable> workQueue - タスクの実行前に、そのタスクを保持するために使用するキュー。
そして以下の2項目がオプション的な要素です。
- ThreadFactory threadFactory - executorが新しいスレッドを作成するときに使用するファクトリ
- RejectedExecutionHandler handler - スレッドの境界に達し、キューの容量に達したため、実行がブロックされたときに使用されるハンドラ
と言っても必須要素含めて全部にgetter / setterが用意されてるんですけどね。
とりあえずAsyncTaskではこのあたりをどう定義しているか見ていきましょう。
private static final int CORE_POOL_SIZE = 5; private static final int MAXIMUM_POOL_SIZE = 128; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(10); public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
まぁ見たまんまって感じです。
corePoolSizeが5なので常時5スレッドをプールしています。maximumPoolSizeは128なので128スレッドまでプールできます。workQueueのサイズが10なので10個以上のキューを保持することはできません。
corePoolSizeとmaximumPoolSizeとworkQueueのサイズの関係はちょっとややこしいです。まずcorePoolSize分のスレッドのうち、暇そうな奴がいればそいつにタスクを任せます。corePoolSize分のスレッドが全員忙しそうにしていた場合はworkQueueにキューイングしておきます。corePoolSize分のスレッドが全員稼働中かつ、workQueueにこれ以上キューイングできない場合はmaximumPoolSizeの値を見つつスレッドを増やします。(この時、新たに作成されたスレッドが優先して実行されてしまう。)
と言うわけで、maximumPoolSize > workQueueのサイズ > corePoolSizeとなるように設定するのがベターだと思われます。あんまりcorePoolSizeを大きくしすぎるとアイドル状態のスレッドを持て余してしまいますし、あんまり小さくしすぎると何度もスレッドが増減してプーリングの恩恵を得られません。
一般的にcorePoolSizeはCPUのコア数にあわせるといいと言われています。まァ、どんな言語だろうとスレッドが増えても処理できるCPUがなければあんまり意味ないですからね。一応下記のコードで端末のコア数を調べることができます。
int coreSize = Runtime.getRuntime().availableProcessors();
ただ、Runtime#availableProcessorsの説明を読む限り、Android 4.2以前の場合は正確な値は返ってきませんし、本来より低めの値が返ってきた結果ロクにプーリングできないのも困ります。どうせ汎用的なものを作るつもりはないので処理内容からざっと推測し、ある程度決め打ってしまってもいいんじゃないでしょうか。あるいは、キューのサイズを大きめにとっておくか。
他の要素も見ていきましょう。keepAliveTimeは1秒として設定されています。いきなりいっぱいタスクがきてcorePoolSizeよりスレッド数が大きくなってしまったんだけど、ある程度捌き終わり、増やしたスレッドがアイドルになったら1秒後に殺すわけですね。逆に言えば、1秒以内にまた大量のタスクがきたらそのままキリキリと働いてもらいます。次にthreadFactoryですが、これはスレッドごとにユニークな名前をつけているだけです。
AsyncTaskではRejectedExecutionHandlerが設定されていません。これが何をするのかと言うと、maximumPoolSize以上のスレッドを作成しようとした時の例外処理を行います。
逆にこれが設定されていないと、Executor#executeを呼び出した瞬間に未処理のRejectedExecutionExceptionが飛んできます。
例外が飛んでくるのはちょっと…と言うケースもあるでしょう。RejectedExecutionHandlerを実装した匿名クラスを作成してもいいんですが、ある程度決まりきった処理をする場合がほとんどだろうと思ったのか、ThreadPoolExecutor内に以下のインナークラスが用意されています。
- ThreadPoolExecutor.AbortPolicy
- ThreadPoolExecutor.CallerRunsPolicy
- ThreadPoolExecutor.DiscardOldestPolicy
- ThreadPoolExecutor.DiscardPolicy
それぞれがどんな動作をするのかは、各クラスのrejectedExecutionメソッドの説明を読んだり、この辺の記事を読むなどしましょう。どれにもそぐわなかったら結局作るしかないんですがね。
で、最終的に私はこんな感じにしました。
private static final int MAX_THREAD = 128; private static ThreadPoolExecutor _executor; static { _executor = init(); } private static ThreadPoolExecutor init() { return new ThreadPoolExecutor(3, MAX_THREAD , 1L, TimeUnit.SECOND , new LinkedBlockingQueue<Runnable>(10) , new ThreadFactory() { private final AtomicInteger _count = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "HttpImageManager #" + _.getAndIncrement()); t.setDaemon(true); return t; } }, new RejectedExecutionHandler(){ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if(r instanceof ImageFuture) ((ImageFuture) r).cancel(true); } }); }
RejectedExecutionHandlerは特に気にしなくていいです。例外を出されても困るので処理をさっさとキャンセルするだけです。また、なぜわざわざinitなんてメソッドを用意したのかは後述します。
一応スレッドをデーモン化しておいてありますが、Androidにおけるデーモンスレッドとメインスレッドの関係はいまいちよくわかりませんでした。メインスレッドが死んでるならもういいでしょ、ぐらいの気持ちでやってます。
まぁぶっちゃけた話、AsyncTask丸パクリでもいいと思います。RejectedExecutionHandlerとスレッド数に関わる値をちょちょっといじるぐらいで十分です。
Callable、FutureTask、Handlerを作成する
じゃあ実際の処理を組み立てていきましょう。AsyncTaskではこんな風にしています。
private static final InternalHandler sHandler = new InternalHandler(); private final WorkerRunnable<Params, Result> mWorker; private final FutureTask<Result> mFuture; private final AtomicBoolean mTaskInvoked = new AtomicBoolean(); public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); return postResult(doInBackground(mParams)); } }; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { final Result result = get(); postResultIfNotInvoked(result); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occured while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } catch (Throwable t) { throw new RuntimeException("An error occured while executing " + "doInBackground()", t); } } }; } private void postResultIfNotInvoked(Result result) { final boolean wasTaskInvoked = mTaskInvoked.get(); if (!wasTaskInvoked) { postResult(result); } } private Result postResult(Result result) { Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT, new AsyncTaskResult<Result>(this, result)); message.sendToTarget(); return result; } protected final void publishProgress(Progress... values) { if (!isCancelled()) { sHandler.obtainMessage(MESSAGE_POST_PROGRESS, new AsyncTaskResult<Progress>(this, values)).sendToTarget(); } } private void finish(Result result) { if (isCancelled()) { onCancelled(result); } else { onPostExecute(result); } mStatus = Status.FINISHED; } // 以下、AsyncTaskのinner classes private static class InternalHandler extends Handler { @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult result = (AsyncTaskResult) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } } private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; } @SuppressWarnings({"RawUseOfParameterizedType"}) private static class AsyncTaskResult<Data> { final AsyncTask mTask; final Data[] mData; AsyncTaskResult(AsyncTask task, Data... data) { mTask = task; mData = data; } }
WorkerRunnable(Caller)でdoInBackgroundを呼び出し、その結果をHandlerに渡しています。
Process#setThreadPriorityとは、まぁそのままの意味で実行するスレッドの優先度に関わってきます。どのような値をセットできるかはこちらの記事を読むのが一番わかりやすいと思われます。で、読んでみればわかると思いますが、THREAD_PRIORITY_BACKGROUND以外の選択肢はないと言っても過言ではありません。
AsyncTaskで使っているHandlerの実体はインナークラスで持っています。InternalHandlerと言う、まんまの名前です。postResultではそのまま処理結果を、doInBackground内でpublishProgressメソッドが呼ばれた場合はonProgressUpdateをUIスレッドに委譲しています。
この様にMessageのwhatに渡す値でHandlerへ委譲する処理を分岐させられるわけですね。まぁこの辺はHandlerを使ったことがある人なら知ってるでしょう。
なぜCallerでもFutureTaskでもHandlerに渡しているのかはよくわかりません。
で、実際に作ってみたものです。
/** Handler内で使用するコールバック */ public interface ImageCallback { void call(Object key, Bitmap bitmap); } /** Handler内で使用するエラーハンドラ */ public interface ErrorHandler { void call(Exception e); } /** バックグラウンドスレッドで実行する内容(Caller) */ public static class ImageCaller implements Callable<BitmapResult> { private final String _url private final Point _maxSize; private boolean _isSaveCache; private boolean _isSaveFile; private int _timeout = 30000; public ImageCaller(String url, Point maxSize) { _url = url; _maxSize = maxSize; } public void setSaveCache(boolean isSaveCache) { _isSaveCache = isSaveCache; } public void setSaveFile(boolean isSaveFile) { _isSaveFile = isSaveFile; } public void setTimeout(int timeout) { _timeout = timeout; } @Override public BitmapResult call() throws Exception { android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_BACKGROUND); return ImageUtil.getBitmapFromHttp(_url, _maxSize, _timeout, _isSaveCache, _isSaveFile); } } /** ImageCallerの処理結果を受け取り、ImageHandlerに渡すFutureTask */ static class ImageFuture extends FutureTask<BitmapResult> { private ImageHandler _handler; public ImageFuture(Object key, ImageCaller caller, ImageCallback callback, boolean isThrowException) { super(caller); _handler = new ImageHandler(key, callback, isThrowException); } public ImageFuture(Object key, ImageCaller caller, ImageCallback callback, ErrorHandler onError) { super(caller); _handler = new ImageHandler(key, callback, onError); } @Override protected void done() { // 例外が発生してなければMessage#whatにMSG_CALLBACKを、objにbitmapを渡す // (Executor関連も含む)例外が発生した場合はMessage#whatにMSG_ONERRORを、objにExceptionを渡す try { BitmapResult result = get(); if(!result.hasError()) { _handler.obtainMessage(MSG_CALLBACK, result.getBitmap()).sendToTarget(); } else { _handler.obtainMessage(MSG_ONERROR, result.getError()).sendToTarget(); } } catch (InterruptedException e) { _handler.setNotThrowException(); _handler.obtainMessage(MSG_ONERROR, e).sendToTarget(); } catch (ExecutionException e) { _handler.obtainMessage(MSG_ONERROR, e).sendToTarget(); } catch (CancellationException e) { _handler.setNotThrowException(); _handler.obtainMessage(MSG_ONERROR, e).sendToTarget(); } catch (Exception e) { _handler.setNotThrowException(); _handler.obtainMessage(MSG_ONERROR, e).sendToTarget(); } finally { _handler = null; } } } /** UIスレッド内で実行する内容(Handler) */ static class ImageHandler extends Handler { private Object _key; private ImageCallback _callback; private ErrorHandler _onError; private boolean _isThrowException; // コンストラクタはHandler(Looper)を継承する // UIスレッドで確実に実行するため、LooperはLooper#getMainLooper()のみを使用する public ImageHandler(Object key, ImageCallback callback, boolean isThrowException) { super(Looper.getMainLooper()); _key = key; _callback = callback; _isThrowException = isThrowException; } public ImageHandler(Object key, ImageCallback callback, ErrorHandler onError) { super(Looper.getMainLooper()); _key = key; _callback = callback; _onError = onError; } public void setNotThrowException() { _isThrowException = false; } @Override public void handleMessage(Message msg) { if(_keyHolder != null) _keyHolder.remove(_key); switch (msg.what) { case MSG_CALLBACK: _callback.call(_key, (Bitmap) msg.obj); break; case MSG_ONERROR: Exception e = (Exception) msg.obj; if(_onError != null) { _onError.call(e); } else if(_isThrowException) { _callback = null; _onError = null; _key = null; new RuntimeException(e); } else { e.printStackTrace(); } break; } _callback = null; _onError = null; _key = null; } }
具体的にどうやって画像を処理してるかだとか、Object keyって何やねんって言うのは、これを読んでください。と言うかこれをなるべくそのままの形で移植できるよう作ったので、HttpImageLoaderをそのまま3つに分解したと言ったほうがいいのかもしれません。
次回の内容
今回で終わらせる気満々だったんですが、文字数が足りなくなってしまいました。
次回はAsyncTaskでは行われていないExecutorのシャットダウン方法の説明と、コードの完全版をはっつけて終わりにする予定です。