@ledsun blog

無味の味は佳境に入らざればすなわち知れず

EC2のvCPU数はハイパースレッディングを含む

何並列までいけるのか? - @ledsun blog で4並列で頭打ちになりました。 IOの影響をさらにそぎ落とすために、計測スクリプトを次のように改良しました。

require 'benchmark'
require 'text_alignment'
require 'active_support'
require_relative 'config/initializers/ractor.rb'

MAX=1

test_data = Dir["./tmp/send_data20221125-4914*/*"].map do |file|
  Marshal.load(File.binread(file))
end

time = Benchmark.realtime do
  pipe = Ractor.new do
    loop do
      Ractor.yield Ractor.receive
    end
  end

  workers = (1..MAX).map do
    Ractor.new pipe do |pipe|
      while msg = pipe.take
        aligner = TextAlignment::TextAlignment.new(msg[:ref_text], msg[:options])
        results = msg[:data].map do |datum|
          begin
            aligner.align(datum[:text], datum[:denotations] + datum[:blocks])

            {
              denotations: aligner.transform_hdenotations(datum[:denotations]),
              blocks: aligner.transform_hdenotations(datum[:blocks]),
              lost_annotations: aligner.lost_annotations,
              block_alignment: aligner.lost_annotations.present? ? aligner.block_alignment : nil
            }
          rescue => e
            break {
              error: e.message
            }
          end
        end

        Ractor.yield(Ractor.make_shareable({
          index: msg[:index],
          results: results
        }), move: true)
      end
    end
  end

  test_data.each do |send_data|
    pipe.send(send_data)
  end.each do
    _r, results = Ractor.select(*workers)
  end
end

p MAX, time

そして再計測してみました。

t3.2xlaregインスタンス上での並列数の計測結果

やはり4並列で頭打ちです。

ここでt3.2xlaregのコア数を確認してみます。

t3.2xlaregのvCPU数は8

あれ?もしかしてvCPU数ってコア数じゃないですか?

CPU オプションの最適化 - Amazon Elastic Compute Cloud

Amazon EC2 インスタンスは、単一の Intel Xeon CPU コアで同時に複数のスレッドを実行できるマルチスレッドをサポートしています。各スレッドは、インスタンスの仮想 CPU (vCPU) として表されます。インスタンスには、インスタンスタイプによって異なるデフォルト数の CPU コアがあります。例えば、m5.xlarge インスタンスタイプには 2 つの CPU コアがあり、デフォルトでは各コアごとに 2 つのスレッドの合計で 4 つの vCPU があります。

もしかしてvCPU 8ってことは、4コア8スレッドってことですか? なるほど4並列を越えて試すには、t3.2xlargeは不適切だったようです。

何並列までいけるのか?

Ractorちゃん並列化してたー。 - @ledsun blog でRactorで並列に動かせていることを確認しました。 しかし4コアのマシンで確認したため、並列数の上限がわかりませんでした。 そこでAWS上のEC2インスタンスを使って並列数の上限を調べます。 t3.2xlargeインスタンスを使います。 t3.2xlargeインスタンスは8コアあります。 8並列までを計測しました。

t3.2xlaregインスタンス上での並列数の計測結果

4並列でピタっと止まりました。 データの投入速度が足りていないのでしょうか?

ruby.wasmではファイルに書き込めない

require_relativeはパッチれない - @ledsun blog で、Rubyスクリプトの実行前に、Rubyスクリプトを静的にあつかってrequire_relativeを置き換えることにしました。 そこで次のアイデアを思いつきました。

Rubyスクリプトの実行前に、require_relativeの参照先スクリプトを取得して、RubyVMのファイルシステムに書き込んでしまえばいいのでは?

これができると次のメリットがあります。

  • fetch(Promise)を待つのが、RubyVMの外側になる
  • Rubyスクリプトをfetchで取得できる
  • 並列ダウンロードが可能になる

というわけで、ruby.wasmでファイリングシステムに書き込みができるか試してみます。

Rubyでファイルを書き込むワンライナーです。 irbで動くことを確認します。

irb(main):004:0> File.open("test.rb", "w", 0755) { _1.write "hello" }
=> 5

これをruby.wasmで動かすと

ruby.wasmでファイルを作ろうとすると例外が起きます。

class Errno::ENOTCAPABLE (Ruby 3.1 リファレンスマニュアル)

システムコールのエラーコードを表す例外クラスです。

ファイルをオープンできないみたいです。

あれ?でもrequireで標準Gemは読み込めます。 例えば'require "csv"'は実行可能です。 何か方法があるのかもしれません。

JS::Object.await

fetchは非同期だった - @ledsun blog でfetchの完了をRuby側で待てなくて、require_relativeの順番の制御が上手く行きませんでした。 ruby.wasmでJS::Object.awaitという関数を見つけました。 名前からするとPromiseを待てそうな関数です。 これを使えばfetchでrequire_relativeが実現できるのでしょうか?

次のようなrequire_relativeの代替え関数を用意します。

function rb_require_relative(relative_feature): Promise<boolean> {
  const filename = relative_feature.endsWith(".rb")
    ? relative_feature
    : `${relative_feature}.rb`;
  const url = new URL(filename, vm.currentURL);

  // Prevents multiple loading.
  if (loadedPathes.has(url.pathname)) {
    return Promise.resolve(false);
  }

  return new Promise((resolve, reject) => {
    fetch(url).then((response) => {
      if (response.ok) {
        response.text().then((text) => {
          new RubyScriptWithSourceURL(vm, url, text).eval();
          resolve(true);
        });
      }
      resolve(false);
    });
  });
};

これを次のようなRubyの関数で待てれば良さそうです。

require "js"
module Kernel
  def patched_require_relative(relative_feature)
    JS.global.rb_require_relative(relative_feature).await
  end
end

試して見たところ、そうはならないんですよ。

This method looks like a synchronous method, but it actually runs asynchronously using fibers.

コメントにこう書いてあるとおり、実際には非同期に実行されるので、require_relativeの実行順序は維持できないのでした。

Ractorちゃん並列化してたー。

インスタンス生成コストが大きかった - @ledsun blog までやった結果、Ractor間で受け渡しているデータ構造が大分はっきりしました。

ここまできたら、送信データを保存しておけば、Ractor化している処理だけで実行出来るはずです。 次のような感じで、Ractorに送信するデータをシリアライズしてファイルに保存しておきます。

File.open("#{path}/#{doc.sourcedb}_#{doc.sourceid}", "wb", 0755) do |f|
  send_data = Marshal.dump({
    index: index,
    ref_text: ref_text,
    options: options,
    data:data
  })
  f.write(send_data)
end

このデータファイルを読み込んでワーカーRactorに送り込めば、より厳密に並列化の効果が測定できます。 テキストアライメント処理のまえのデータ読み込みの部分をスキップできます。

次のスクリプトをつくって動かしてみます。

require 'text_alignment'
require 'active_support'
require_relative 'config/initializers/ractor.rb'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

workers = (1..4).map do
  Ractor.new pipe do |pipe|
    while msg = pipe.take
      aligner = TextAlignment::TextAlignment.new(msg[:ref_text], msg[:options])
      results = msg[:data].map do |datum|
        begin
          aligner.align(datum[:text], datum[:denotations] + datum[:blocks])

          {
            denotations: aligner.transform_hdenotations(datum[:denotations]),
            blocks: aligner.transform_hdenotations(datum[:blocks]),
            lost_annotations: aligner.lost_annotations,
            block_alignment: aligner.lost_annotations.present? ? aligner.block_alignment : nil
          }
        rescue => e
          break {
            error: e.message
          }
        end
      end

      Ractor.yield(Ractor.make_shareable({
        index: msg[:index],
        results: results
      }), move: true)
    end
  end
end

Dir["./tmp/send_data20221122-2664*/*"].each.with_index do |file|
  send_data = Marshal.load(File.binread(file))
  pipe.send(send_data)
end.each do
  _r, results = Ractor.select(*workers)
end

ワーカー数を1~4まで変えて処理時間をグラフ化します。

並列化が効いているグラフ

でました放物線ぽいグラフです! 4コアのローカルPCで試しました。 これ以上の並列数はAWS EC2環境で計測する必要があります。 とはいえ、ここまで典型的なグラフがでたなら、割と良い感じに並列化できてそうです。

Ruby並行・並列くらべ - @ledsun blog の頃から、ずっとRactorで並列化できているっぽいけど、おもったより性能伸びないなあ・・・と悩んでいました。 2ヶ月前です。 気がつけば当たり前の話です。 並列化したところは高速化できていました。 高速化したぶんボトルネックが、並列化してない処理に移動していただけでした。

Stackprofで時間の掛かる処理を探そうとして上手く行かなかった話 - @ledsun blog でStackprofしたときは、上手くデータが取れませんでした。 とれていたら、もうちょっと早く気がつけていたのでしょうか?

インスタンス生成コストが大きかった

Ractor化する範囲を小さくしたら遅くなった - @ledsun blog で、Ractor化する範囲を小さくしたら処理が遅くなって混乱しました。 よくよくソースコードを確認したところ、変わっている場所がありました。

aligner = TextAlignment::TextAlignment.new(msg[:ref_text], msg[:options])
aligner.align(msg[:text], msg[:denotations] + msg[:blocks])

ここです。 元は次でした。

aligner = TextAlignment::TextAlignment.new(ref_text, o)
copy_a = Marshal.load(Marshal.dump(a))
m = copy_a.map do |annotation|
  Annotation.align_annotations!(annotation, ref_text, aligner)
end.flatten

TextAlignment::TextAlignment.newがループの中に入っています。 このインスタンス生成が遅くなった原因のようです。 確認のために次のようにソースコードを修正しました。

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

workers = (1..4).map do
  Ractor.new pipe do |pipe|
    while msg = pipe.take
      aligner = TextAlignment::TextAlignment.new(msg[:ref_text], msg[:options])
      results = msg[:data].map do |datum|
        begin
          aligner.align(datum[:text], datum[:denotations] + datum[:blocks])

          {
            denotations: aligner.transform_hdenotations(datum[:denotations]),
            blocks: aligner.transform_hdenotations(datum[:blocks]),
            lost_annotations: aligner.lost_annotations,
            block_alignment: aligner.lost_annotations.present? ? aligner.block_alignment : nil
          }
        rescue => e
          {
            error: e
          }
          break
        end
      end

      Ractor.yield(Ractor.make_shareable({
        index: msg[:index],
        results: results
      }), move: true)
    end
  end
end

annotations_collection_with_doc.each_with_index do |a_and_d, index|
  annotations, doc = a_and_d
  ref_text = doc&.original_body || doc.body
  results = {}

  targets = annotations.filter {|a| a[:denotations].present? || a[:blocks].present? }
  data = targets.map do |annotation|
    # align_hdenotations
    text = annotation[:text]
    denotations = annotation[:denotations] || []
    blocks = annotation[:blocks] || []

    {
      text: text,
      denotations: denotations,
      blocks: blocks
    }
  end
  pipe.send(Ractor.make_shareable({
    index: index,
    ref_text: ref_text,
    options: options,
    data:data
  }))
end.each do |annotations, doc|
  _r, results = Ractor.select(*workers)

  annotations, doc = annotations_collection_with_doc[results[:index]]
  ref_text = doc&.original_body || doc.body
  targets = annotations.filter {|a| a[:denotations].present? || a[:blocks].present? }

  messages << results[:results].map.with_index do |result, i|
    if result[:error]
      raise "[#{annotation[:sourcedb]}:#{annotation[:sourceid]}] #{result[:error].message}"
    else
      annotation = targets[i]
      annotation[:denotations] = result[:denotations]
      annotation[:blocks] = result[:blocks]
      annotation[:text] = ref_text
      annotation.delete_if{|k,v| !v.present?}

      if result[:lost_annotations].present?
        {
          sourcedb: annotation[:sourcedb],
          sourceid: annotation[:sourceid],
          body:"Alignment failed. Invalid denotations found after transformation",
          data:{
            block_alignment: result[:block_alignment],
            lost_annotations: result[:lost_annotations]
          }
        }
      else
        nil
      end
    end
  end.compact
end

計測してみます。

最新版の計測結果

比較のため非Ractor版と修正前のRactor版も載せます。

非Ractor版の計測結果

修正前のRactor版の計測結果

Ractor化する前よりは速くなりました。 修正前よりは遅いです。 まだ、なにか見落としている差分がありそうです。

とはいえ、オブジェクトのディープコピーを減らしてもめざましい効果がないことがわかりました。 「Ractor間のコピーに時間が掛かる」という説は間違っていそうです。 もしかするとRactor間でデータを受け渡すための入れ物として配列やハッシュを作っている部分がコストになっているのでしょうか?

Ractor化する範囲を小さくしたら遅くなった

GCの影響は少なそう - @ledsun blog で、コピーする範囲を小さくすることで高速化できそうと考えました。 そこで、根本的にRactor化する範囲を小さくします。 そうすることでRactorで受け渡しするデータを最小化し、なんならmoveします。

次のようなソースコードにしました。

    pipe = Ractor.new do
      loop do
        Ractor.yield Ractor.receive
      end
    end

    workers = (1..4).map do
      Ractor.new pipe do |pipe|
        while msg = pipe.take
          begin
            aligner = TextAlignment::TextAlignment.new(msg[:ref_text], msg[:options])
            aligner.align(msg[:text], msg[:denotations] + msg[:blocks])

            Ractor.yield(Ractor.make_shareable({
              index: msg[:index],
              denotations: aligner.transform_hdenotations(msg[:denotations]),
              blocks: aligner.transform_hdenotations(msg[:blocks]),
              lost_annotations: aligner.lost_annotations,
              block_alignment: aligner.block_alignment
            }), move: true)
          rescue => e
            Ractor.yield(Ractor.make_shareable({
              error: e
            }))
          end
        end
      end
    end

    annotations_collection_with_doc.each do |annotations, doc|
      ref_text = doc&.original_body || doc.body
      results = {}

      targets = annotations.filter {|a| a[:denotations].present? || a[:blocks].present? }
      targets.each_with_index do |annotation, index|
        # align_hdenotations
        text = annotation[:text]
        denotations = annotation[:denotations] || []
        blocks = annotation[:blocks] || []

        pipe.send(Ractor.make_shareable({
          index: index,
          ref_text: ref_text,
          text: text,
          denotations: denotations,
          blocks: blocks,
          options: options
        }), move: true)
      end.each do |annotation|
        _r, result = Ractor.select(*workers)

        if result[:error]
          raise "[#{annotation[:sourcedb]}:#{annotation[:sourceid]}] #{result[:error].message}"
        else
          results[result[:index]] = result
        end
      end

      messages << targets.map.with_index do |annotation, index|
        result = results[index]
        annotation[:denotations] = result[:denotations]
        annotation[:blocks] = result[:blocks]
        annotation[:text] = ref_text
        annotation.delete_if{|k,v| !v.present?}

        if result[:lost_annotations].present?
          {
            sourcedb: annotation[:sourcedb],
            sourceid: annotation[:sourceid],
            body:"Alignment failed. Invalid denotations found after transformation",
            data:{
              block_alignment: result[:block_alignment],
              lost_annotations: result[:lost_annotations]
            }
          }
        else
          nil
        end
      end.compact
    end

これで速くなるはずです。 うごかしてみます。

Ractor化する範囲を限定した時の計測結果

比較のためにRactorを使わない時の結果を載せます。

Ractor化する前の計測結果

2m14s -> 7m 41s です。 とても遅くなりました。

え?そんなことあるの?

GCの影響は少なそう

AWS EC2上で並列処理の時間を再々計測した - @ledsun blog で、次の3つの仮説を立てました。

  1. コピーがボトルネックではなかった
  2. コピーは減ったが相変わらずボトルネックである
  3. コピーで作られたオブジェクトのGCボトルネックだった

また1は可能性が低いと判断しました。 今回は3の可能性を検証します。

対象の処理は全体はActiveJobで実装しています。 そこで次のようなコールバックを定義して、GCの実行回数をしらべます。

  around_perform do |job, block|
    GC.start
    GC.stat => {minor_gc_count: prev_minor_gc_count, major_gc_count: prev_major_gc_count}
    block.call
  ensure
    GC.stat => {minor_gc_count: minor_gc_count, major_gc_count: major_gc_count}
    tengu_p minor_gc_count - prev_minor_gc_count, major_gc_count - prev_major_gc_count
  end

右代入というやつを初めて使いました。 余談です。

計測した結果は次の通りです。 249文章を処理しおわった時にジョブを中止して、その間に発生したGCの数を数えました。

minor_gc_count major_gc_count
Ractorなし 264 12
コピー2回 227 14
コピー1回 308 17

コピー回数とGCの回数には相関がありません。 「コピーで作られたオブジェクトのGCボトルネックだった」の可能性も低そうです。

消去法で「コピーは減ったが相変わらずボトルネックである」の可能性が高くなりました。 ここに注力するのが良さそうです。

参考

AWS EC2上で並列処理の時間を再々計測した

AWS EC2上で並列処理の時間を再計測した - @ledsun blogAWS環境で計測したデータが、その前に計測したデータと違いました。 どっちが真かわからないのでもう一度再計測しました。

AWS上で3回目の計測

AWS上で2回目の計測

AWS上で1回目の計測

1回目が明らかにちがうグラフです。 1回目の計測が間違っていたと考えられます。

考察

つぎの疑問は「ローカルPCではコピー数削減の効果があった。AWSでは効果が非常に小さいのはなぜか」です。 適当に仮説を立ててみます。

  1. コピーがボトルネックではなかった
  2. コピーは減ったが相変わらずボトルネックである
  3. コピーで作られたオブジェクトのGCボトルネックだった

1はわずかですが効果があるので、なさそうです。 2はテキストアライメント処理をもう少し深くみて、コピー量をさら減らすと確認できそうです。 3はGCを止めて効果があるか試してみればわかりそうです。

require_relativeはパッチれない

requrie_relativeをハックしたい

require_relativeの相対参照の起点となるもの - @ledsun blogRubyスクリプトのURLを保持する必要があるとわかりました。 そこで次の感じでURLを保持したVMクラスを作りました。

import { RubyScriptAndSourceURL } from "./RubyScriptAndSourceURL";

// To achieve require_relative, we need to resolve relative paths in Ruby scripts.
// VM to remember the URL of the running Ruby script.
export class RubyVMWithURL {
  private _vm;
  // Stores the URL of the running Ruby script to get the relative path from within the Ruby script.
  private _soruceURLStack: Array<URL>;

  constructor(vm) {
    this._vm = vm;
    this._soruceURLStack = [];
  }

  eval(script: RubyScriptAndSourceURL): void {
    if (script) {
      this.evalOn(script.URL, script.body);
    }
  }

  evalFromCurrentURL(scriptBody: string): void {
    this.evalOn(this.currentURL, scriptBody);
  }

  evalOn(url: URL, scriptBody: string): void {
    this._soruceURLStack.push(url);

    if (scriptBody) {
      this._vm.eval(scriptBody);
    }

    this._soruceURLStack.pop();
  }

  get currentURL(): URL {
    return this._soruceURLStack.at(-1);
  }
}

改めてみると、ちょっと不格好ですね。まあ、とりあえず動くので require_relativeのパッチを書きます。 次のようにrequire_relativeをJavaScriptで定義したrb_require_relativeに置き換えます。

    require "js"

    module Kernel
      def require_relative(relative_feature)
        ret = JS.global.rb_require_relative(relative_feature)

        return ret[:isLoaded] if ret[:errorMessage].to_s.empty?

        raise LoadError.new ret[:errorMessage].to_s
      end
    end

requireに影響が出てしまう

ところがrequire 'csv'が動かなくなります。

require "csv" 起因のエラー

組み込みのGemはパッチを当てなくてもrequire_relativeが動いていました。 つまり、require_relativeへのパッチが過剰に聞いています。 では、alias_method してもとのrequrie_relativeを実行して失敗したときだけ、JavaScript版を動かすと良さそうです。 次のイメージです。

module Kernel
  alias_method :hoge, :require_relative

  def require_relative(relative_feature)
    hoge(relative_feature)
  rescue LoadError
    ret = JS.global.rb_require_relative(relative_feature)
  end
end

ところがこれだ上手く行かないのです。 エラーがかわりません。

requrie_relativeはパッチれない

なぜかというと、これが表題の「requrie_relativeはパッチれない」です。

次のRubyスクリプトをCRubyで実行するとエラーが起きます。

module Kernel
  alias_method :hoge, :require_relative

  def require_relative(relative_feature)
    hoge(relative_feature)
  end
end

require 'csv'
ledsun@MSI:~/ruby.wasm►ruby test.rb
test.rb:five:in `require_relative': cannot load such file -- /home/ledsun/ruby.wasm/csv/fields_converter (LoadError)
        from test.rb:five:in `require_relative'
        from /home/ledsun/.rbenv/versions/3.2.0-preview2/lib/ruby/3.2.0+2/csv.rb:96:in `<top (required)>'
        from <internal:/home/ledsun/.rbenv/versions/3.2.0-preview2/lib/ruby/3.2.0+2/rubygems/core_ext/kernel_require.rb>:85:in `require'
        from <internal:/home/ledsun/.rbenv/versions/3.2.0-preview2/lib/ruby/3.2.0+2/rubygems/core_ext/kernel_require.rb>:85:in `require'
        from test.rb:nine:in `<main>'

ブラウザの時と似た感じのエラーです。 csv/fields_converterの場所がおかしいです。 これはrequire_relative がどうやって読み込むRubyスクリプトのパスを解決しているかに依存する現象です。

https://github.com/ruby/ruby/blob/3fae53a343ebd7686bb20d8f4b6855f4d11019cd/load.c#L958-L966

rb_f_require_relative(VALUE obj, VALUE fname)
{
    VALUE base = rb_current_realfilepath();
    if (NIL_P(base)) {
        rb_loaderror("cannot infer basepath");
    }
    base = rb_file_dirname(base);
    return rb_require_string(rb_file_absolute_path(fname, base));
}

rb_current_realfilepath()で現在実行中のRubyスクリプトのパスを取得しています。

https://github.com/ruby/ruby/blob/913979bede2a1b79109fa2072352882560d55fe0/vm_eval.c#L2552-L2556

rb_current_realfilepath(void)
{
    const rb_execution_context_t *ec = GET_EC();
    rb_control_frame_t *cfp = ec->cfp;
    cfp = vm_get_ruby_level_caller_cfp(ec, RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp));

rb_current_realfilepathではRUBY_VM_PREVIOUS_CONTROL_FRAME(cfp)を使って、require_relativeメソッドの呼び出し元を参照しています。 つまりrequire_relativeメソッドにパッチを当てると、常にパッチをあてた場所が相対パスの起点になってしまいます。

次の手

RubyVM中のrequire_relativeにパッチが当てられません。 RubyVMに与えるRubyスクリプトにパッチを当ててはどうでしょうか?

const patchedScript = scriptBody.replace(/require_relative/g, 'patched_require_relative')
this._vm.eval(patchedScript);

こういう感じです。 *1

*1:知らなかったのですが、TypeScriptってString.ReplaceAll使えないんですね。https://bobbyhadz.com/blog/typescript-string-replace-all-occurrences

AWS EC2上で並列処理の時間を再計測した

Ractor間のデータのやりとりでコピーを減らすには - @ledsun blogAWS上で再計測してみると書きました。 再計測しました。

AWS上でのコピー1回と2回の処理時間の比較
あまり効果がないようです。 ローカルPCで試したときは、次のようにもう少し明確に効果がありそうでした。

ローカルPC上でのコピー1回と2回の処理時間の比較

そもそも、前回AWSで計測したときは次のグラフでした。

前回計測したAWS上で処理時間

AWS EC2はインスタンスタイプが同じでも、インスタンスに個体差があるようです。 そのため処理時間が平均的に変わるのはわかります。 しかし1並列の時の処理時間が増えるのと減るので真逆の動きをしているの点は気になります。

1並列以外の挙動は、今回の計測と一致しています。 ということは、計測ミスだった可能性が考えられます。 同じインスタンスの実体を入手するのは不可能です。 とはいえ、計測ミスが混入している可能性がたかいので、再度計測した方が良さそうです。

Ractor間のデータのやりとりでコピーを減らすには

Ractor#send move:true したときに出た3つのエラー - @ledsun blog にて、ディープコピーを減らすためにmoveオプションを使いました。 ところがエラーがでました。またエラーの内容が難解で対応出来そうにありません。 そこで、ディープコピーを減らすための別の方法を考えます。

仮説

ドキュメントを見ると次の記述があります。

ruby/ractor.md at master · ruby/ruby · GitHub

There are 3 ways to send an object as a message (1) Send a reference: Sending a shareable object, send only a reference to the object (fast)

a shareable object は参照のみをコピーです。 ソースコードでは次の箇所が該当しそうです。

https://github.com/ruby/ruby/blob/v3_1_2/ractor.c#L930-L937

    else if (rb_ractor_shareable_p(obj)) {
        basket->type = basket_type_ref;
        basket->v = obj;
    }
    else if (!RTEST(move)) {
        basket->v = ractor_copy(obj);
        basket->type = basket_type_copy;
    }

今回のRactorの構成では、次のように2回Ractor間のデータの受け渡しがあります。

main Ractor -> pipe Ractor -> worker Ractor

ディープコピーが2回走っているはずです。 これを次の振る舞いに変更します。

  1. main Ractorで送信データをshareableにする
  2. workerで受信データをディープコピーしてから編集する

これでコピー回数が減り、処理時間が短くなるはずです。

実装

   pipe = Ractor.new do
      loop do
        msg = Ractor.receive
        Ractor.yield(msg)
      end
    end

    workers = (1..4).map do
      Ractor.new pipe do |pipe|
        while msg = pipe.take
          a, ref_text, o = msg
          aligner = TextAlignment::TextAlignment.new(ref_text, o)
          copy_a = Marshal.load(Marshal.dump(a))
          m = copy_a.map do |annotation|
            Annotation.align_annotations!(annotation, ref_text, aligner)
          end.flatten

          result = [m, copy_a]
          Ractor.yield(result, move: true)
        end
      end
    end

    annotations_collection_with_doc = annotations_collection_with_doc.collect do |annotations, doc|
      ref_text = doc.original_body.nil? ? doc.body : doc.original_body
      msg = [annotations, ref_text, options]
      Ractor.make_shareable(msg)
      pipe.send(msg)
      doc
    end

ポイントは次の2点です。

main Ractorで送信データをshareableにします。

Ractor.make_shareable(msg)
pipe.send(msg)

workerで受信データをディープコピーします。

copy_a = Marshal.load(Marshal.dump(a))
m = copy_a.map do |annotation|
  Annotation.align_annotations!(annotation, ref_text, aligner)
end.flatten

検証

この実装で処理時間が短くなるか確かめてみます。

コピー回数を減らして処理時間が短くなるか検証したグラフ

ディープコピーを減らした結果、処理時間は短くなりました。 期待していたよりは効果は薄く見えます。 また、詰まっているのはワーカーでなくパイプ? - @ledsun blogで掲載したグラフと特徴が異なります。 特に並列数1で処理時間が短くなっていない点が異なります。

前回の計測は、次の2点が異なります。

  1. AWSではなくローカルPCで計測している。CPUやメモリの性能が異なる。
  2. DBへのインサート処理をスキップしていない。処理時間中のRactorの動作時間は減っている。

考察

ディープコピーを減らすと処理時間が減ることがわかりました。 しかし、仮説「Ractor間のデータ受け渡し時のコピーがボトルネックになっている」までは確認できませんでした。

つぎの2つに挑戦してみます。

  1. AWSで性能計測する。ディープコピー削減の影響に関する情報を増やす
  2. 処理内容をみなおす。ディープコピーをなくす。

Ractor#send move:true したときに出た3つのエラー

詰まっているのはワーカーでなくパイプ? - @ledsun blog で、「Ractor間のデータ受け渡し時のコピーがボトルネックになっている」という仮説を立てました。 これを確認するために、Ractor間のデータの受け渡しをmoveにしてみます。

次のようにRactor#sendとRactor.yieldを使っている場所に move: true オプションをつけていきます。

    pipe = Ractor.new do
      loop do
        msg = Ractor.receive
        Ractor.yield(msg, move: true)
      end
    end

    workers = (1..4).map do
      Ractor.new pipe do |pipe|
        while msg = pipe.take
          a, ref_text, o = msg
          aligner = TextAlignment::TextAlignment.new(ref_text, o)
          m = a.map do |annotation|
            Annotation.align_annotations!(annotation, ref_text, aligner)
          end.flatten

          Ractor.yield([m, a], move: true)
        end
      end
    end

    Ractor.make_shareable(options)
    annotations_collection_with_doc = annotations_collection_with_doc.collect do |annotations, doc|
      ref_text = doc.original_body.nil? ? doc.body : doc.original_body
      ref_text.freeze
      pipe.send([annotations, ref_text, options], move: true)
      doc
    end

これを実行すると次の3つのエラーが起きるようになりました。 実行するといずれかのエラーがおきて止まります。

安易にmoveオプションをつければいいわけではなさそうです。

パターン1 no implicit conversion of Symbol into Integer (TypeError)

14:07:32 workor.1 | #<Thread:0x00007fda6a006788 run> terminated with exception (report_on_exception is true):
14:07:32 workor.1 | /home/ledsun/pubannotation/app/models/annotation.rb:466:in `[]': no implicit conversion of Symbol into Integer (TypeError)
14:07:32 workor.1 |     from /home/ledsun/pubannotation/app/models/annotation.rb:466:in `align_annotations!'
14:07:32 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:767:in `block (3 levels) in store_annotations_collection'
14:07:32 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:766:in `map'
14:07:32 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:766:in `block (2 levels) in store_annotations_collection'

Rubyで一番苦手なエラーです。 ハッシュの代わりに配列が渡されているのでしょうか? エラーが起きている箇所は次のようなソースコードです。

   def self.align_annotations!(annotations, ref_text, aligner)
        return [] unless annotations[:denotations].present? || annotations[:blocks].present?

この呼び出し元は最初のソースコードAnnotation.align_annotations!(annotation, ref_text, aligner)です。 いままでハッシュで呼び出していたはずが、配列に変わることがあるみたいです。 同じ変数に格納されている値が変わっているのでしょうか? 何が起きているのか、いまいち想像できません。

パターン2 wrong argument type false (expected Class) (TypeError)

14:21:16 workor.1 | #<Thread:0x00007fda69e0cf18 run> terminated with exception (report_on_exception is true):
14:21:16 workor.1 | <internal:ractor>:627:in `yield': wrong argument type false (expected Class) (TypeError)
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:757:in `block (2 levels) in store_annotations_collection'
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:755:in `loop'
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:755:in `block in store_annotations_collection'
14:21:16 workor.1 | #<Thread:0x00007fda69e0ca90 run> terminated with exception (report_on_exception is true):
14:21:16 workor.1 | <internal:ractor>:694:in `take': thrown by remote Ractor. (Ractor::RemoteError)
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:763:in `block (2 levels) in store_annotations_collection'
14:21:16 workor.1 | <internal:ractor>:627:in `yield': wrong argument type false (expected Class) (TypeError)
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:757:in `block (2 levels) in store_annotations_collection'
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:755:in `loop'
14:21:16 workor.1 |     from /home/ledsun/pubannotation/app/models/project.rb:755:in `block in store_annotations_collection'

見慣れないエラーです。 起きている箇所は最初のソースコードRactor.yield(msg, move: true)です。 pipe役のRactorがデータを送ろうとしたところで、エラーが起きているようです。

https://github.com/ruby/ruby/blob/v3_1_2/ractor.c を見てみます。

ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
{
    VALUE ret_r;
    ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r);
    return Qnil;
}

Racter.yieldの引数はobjで、型エラーに引っかかりそうもありません。 どこかでそれらしい引数で rb_raise(rb_eArgError しているのかというと、こっちもみつかりません。

このエラーはどこで何がおかしいのか、想像できません。

パターン 3 <OBJ_INFO:gc_mark_ptr@gc.c:6713> 0x00007fda6b47ec10 [2 M ] T_NONE

ながいのでgistにしました。 PubAnnotationでRactor#send move:true したときに起きたエラー · GitHub このエラーが起きたときは最終的にSIGIOTでSidekiqプロセスが終了します。 僕は、C言語レイヤーのエラーを上手く読み解けません。 gc_mark_ptr とあるので、GCの途中でエラーが起きているのでしょうか?

詰まっているのはワーカーでなくパイプ?

並列数をかえても処理時間が変わらない謎 - @ledsun blog で「サチっているかもしれない」と仮説を立てました。 それを検証するために並列数を変えて計測してみました。

並列数と処理時間のグラフ

処理時間なので低いほど性能が高いです。 3どころか1並列でサチっています。 *1

1なのは予想外です。 予想外なので、この1がヒントになりそうです。 今回の並列処理の構成上1に該当する物があります。 並列数を管理するためにワーカープールがあります。 ワーカーへのデータの受け渡しにpipeを使っています。 ソースコードは次です。

    Ractor.make_shareable(TextAlignment::CHAR_MAPPING)
    Ractor.make_shareable(TextAlignment::LCSMin::PLACEHOLDER_CHAR)
    pipe = Ractor.new do
      loop do
        Ractor.yield Ractor.receive
      end
    end

    workers = (1..4).map do
      Ractor.new pipe do |pipe|
        while msg = pipe.take
          a, ref_text, o = msg
          aligner = TextAlignment::TextAlignment.new(ref_text, o)
          m = a.map do |annotation|
            Annotation.align_annotations!(annotation, ref_text, aligner)
          end.flatten

          Ractor.yield [m, a]
        end
      end
    end

    annotations_collection_with_doc = annotations_collection_with_doc.collect do |annotations, doc|
      ref_text = doc.original_body.nil? ? doc.body : doc.original_body
      pipe.send([annotations, ref_text, options], move: true)
      doc
    end.map do |doc|
      _r, (error_messages, aligned_annotations) = Ractor.select(*workers)
      messages += error_messages
      [aligned_annotations, doc]
    end

つまりどのワーカーにデータを送るにしても、かならず1つのpipeを通ります。 pipeは1つなのです。 1は怪しい数字です。

pipeはボトルネックになり得るのでしょうか? RactorのsendはデフォルトではCopyです。

https://github.com/ruby/ruby/blob/master/doc/ractor.md#communication-between-ractors

You can choose "Copy" and "Move" by the move: keyword, Ractor#send(obj, move: true/false) and Ractor.yield(obj, move: true/false) (default is false (COPY)).

前述のソースコードのとおりオプションはつけていません。 Ractor間のデータのやりとりはCopyです。 CopyはMoveよりは遅いです。 「ワーカーがCopy待ちしている」という仮説はあり得そうです。

今回の処理では、ワーカーは受け取ったデータを変更しますが、メインRactorはワーカーの処理がおわってから、データを参照します。 Ractor間のすべてのデータの受け渡しをMoveにできるはずです。

もしかして、ついに突破口が見つかったのでしょうか?

*1:ちなみに0は並列化していない、直列処理です。比較しやすくするために0に置いてあります。

並列数をかえても処理時間が変わらない謎

とあるRuby on RailsアプリケーションのSidekiqで動くバックグラウンドジョブを高速化をしています。 CPUバウンドな処理に時間が掛かっています。 並列化して高速化できるか試しています。 RactorでWoker pool - @ledsun blog で、4コアのPCで並列化できるところまで確認できています。 ただし、4並列というほどには高速化出来ていませんでした。 検証につかっているPCやWSLなどの環境に起因するものでしょうか? 検証用にAWS EC2を借りて環境構築しました。

8コアあるt3.x2largeを使います。 今回は使って4並列と8並列で計測して、高速化できるか試します。

4並列で計測

つぎのようにRactorを使って4ワーカーで実行するソースコードがあります。 https://github.com/pubannotation/pubannotation/blob/54a53d80d39a521eff3f4be4110b844f075f4d26/app/models/project.rb#L760-L772

    workers = (1..4).map do
      Ractor.new pipe do |pipe|
        while msg = pipe.take
          a, ref_text, o = msg
          aligner = TextAlignment::TextAlignment.new(ref_text, o)
          m = a.map do |annotation|
            Annotation.align_annotations!(annotation, ref_text, aligner)
          end.flatten

          Ractor.yield [m, a]
        end
      end
    end

計測してみましょう。

4並列で819文章の処理に2分16秒掛かります。

8並列で計測

これに次のパッチをあてて8並列します。

diff --git a/app/models/project.rb b/app/models/project.rb
index 9a769ef2..85da2cac 100644
--- a/app/models/project.rb
+++ b/app/models/project.rb
@@ -757,7 +757,7 @@ class Project < ActiveRecord::Base
       end
     end

-    workers = (1..4).map do
+    workers = (1..8).map do
       Ractor.new pipe do |pipe|
         while msg = pipe.take
           a, ref_text, o = msg
@@ -826,7 +826,7 @@ class Project < ActiveRecord::Base

     messages << { body: "Uploading for #{num_skipped} documents were skipped due to existing annotations." } if num_skipped > 0

-    InstantiateAndSaveAnnotationsCollection.call(self, aligned_collection) if aligned_collection.present?
+    #InstantiateAndSaveAnnotationsCollection.call(self, aligned_collection) if aligned_collection.present?

     messages
   end

パッチの後半はDBへのインサート処理をコメントアウトしています。 これは4並列のときも行っています。

計測してみます。

8並列で819文章の処理に2分16秒掛かります。

並列数をあげたのに処理時間は変わりません。

そもそも並列化できていないのでしょうか?

直列で計測

並列化しないと次のようなソースコードです。

https://github.com/pubannotation/pubannotation/blob/15ba8c0a546a3447845af19006026d67f906f263/app/models/project.rb#L752-L758

    annotations_collection_with_doc.each do |annotations, doc|
      ref_text = doc.original_body.nil? ? doc.body : doc.original_body
      aligner = TextAlignment::TextAlignment.new(ref_text, options)
      messages += annotations.map do |annotation|
        Annotation.align_annotations!(annotation, ref_text, aligner)
      end.flatten
    end

これで計測してみましょう。

直列で819文章の処理に3分13秒掛かります。

一分近く処理時間が伸びます。 並列化できてそうです。

考察

8並列と4並列の処理時間が全く一緒なのが予想外の結果です。 2倍にならないとしても、いくらか高速化することを予想していました。 これはどのような現象が起きているのでしょうか?いくつか仮説を考えてみます。

「並列化した先で実は同一の資源を使っている部分がある。」 今回Ractorで並列化しているので、メモリやDBへの接続を共有したらエラーがおきるはずです。

「並列処理が終わったあとにボトルネックがある。」 stackprofで実際に掛かっている時間を計測すれば、起きているかどうか確認できそうです。 Stackprofで時間の掛かる処理を探そうとして上手く行かなかった話 - @ledsun blog で、直列処理を計測したときは、並列化した部分に時間が掛かっていました。 この線もなさそうです。

「8並列の遙か手前、たとえば3並列でサチっている。」 原因は推測できませんが、起きているかもしれません。 これは2並列、3並列で計測すれば、起きているかどうか確認できそうです。