読者です 読者をやめる 読者になる 読者になる

【Ruby】非同期処理の継続を作ってみる

一年に一つは知らない言語を触ってみよう、と言う自分ルールがあって、今年はRubyにしようかな、と環境を構築しました。

個人的にRubyのコミュニティに属している人たちの色々な言い方があんまり好きじゃない(解説サイトでもすぐに「驚き」とか「魔法」みたいな単語が出てきて既に辟易としている)んだけど、まぁ「人を憎んで仕様を憎まず」と言う言葉もきっと捜せばあるでしょう。

そう言うあざとい宣伝文句も含めてPerlの血を引いているんだと思えば少しだけ気にならなくなりました。

Rubyのマルチスレッド処理には継続がない

そんなわけでこの解説サイトを見ながら二日ほどちまちまとコードを書いていて、ちょうどやりたかったTumblrの記事のバックアップスクリプトを書き終えたんですが、どうにも遅いです。

と言うのも、これはもうどうしようもないんですが、offsetを少しずつ増やして何度も何度もTumblrAPIを叩かなきゃいけないので、通信のオーバーヘッドが馬鹿にならないんですよね。

そんなわけでスレッド立てまくって同時に通信しまくってやろうと思ったわけです。

Rubyのスレッドに関する資料はこの辺を参考にしました。

で、読んでみると継続に該当するメソッドがない。Ruby書いてる人はあんまりマルチスレッドプログラミングやらないんですかね?

多分、Queueとjoinとlambdaを組み合わせれば普通に作れるよね、ってことで、かなり雑なものを作りました。

コード

require "thread"

class AsyncTask
  
  def initialize(on_background, *params)
    this = self
    @result = Queue.new
    
    # Thread.newでもThread.startでも即実行されるので
    # AsyncTask.startが呼ばれるまではクロージャに封じ込めておく
    @task = lambda {
      Thread.new {
        begin
          @result.push(on_background.call(*params))
        rescue Exception => ex
          @exception = ex
        ensure
          @is_end = true
          @on_exuecuted.call(this) if @on_exuecuted
          kill
        end
      }
    }
  end
  
  def start
    @thread = @task.call unless @thread
    return self
  end
  
  def wait
    @thread.join if @thread
  end
  
  def result
    if @exception then raise @exception end
    @result.pop
  end
  
  def status
    @thread.status if @thread
  end
  
  def kill
    Thread.kill(@thread) if @thread
  end
  
  def continue(&on_exuecuted)
    # コンストラクタで渡した処理が既に終わってたら即実行する
    unless @is_end then
      @on_exuecuted = on_exuecuted
    else
      on_exuecuted.call(self)
    end
    
    return self
  end
end

名前はAsyncTaskだけど実装は.NETのTaskをそれとなく意識しています。

実際の実装には到底及ばないんですが、ま、書き捨てのスクリプトなのでスニペット代わりに使う分には十分でしょう。

使い方はこんな感じ。

# コンストラクタの第一引数はlambda以外(blockとかProc)だと
# returnできないので注意
# 静的言語ならこんな注意もいらなかったんですが…。
t = AsyncTask.new(lambda {
  puts "hoge"
  return "end"
}).continue{|r|
  # 何らかの例外が発生してたらresultを参照してもちゃんとExceptionがとんでくる
  # 流石にAggregateExceptionを実装する気力はない
  puts r.result
}.start

puts "main thread end"

t.wait

なんかRubyのThreadには引数も渡せるらしいので、こんな風にもできます。

# コンストラクタに渡した可変長引数をクロージャの引数にする
t = AsyncTask.new(lambda {|x, y|
  puts x
  puts y
  return "end"
}, "hoge", "fuga").continue{|r|
  puts r.result
}.start

勿論コンストラクタに渡すクロージャは返り値がなくても問題ありません。終わったことだけ通知してくれればいいことも多々ありますしね。

まとめ

で、作ったはいいんですが、GVLの関係でやっぱり大した速度があがりません。

明日はその辺を調べたいなと思います。何か記事を書くかどうかは別として。

参考