ruby نقية روبي المتزامنة تجزئة




concurrency hash (8)

حسنًا ، الآن بعد أن حددت المعنى الفعلي لـ "threadsafe" ، إليك تطبيقان محتملان. سيتم تشغيل التعليمة البرمجية التالية إلى الأبد في MRI و JRuby. يتبع التنفيذ بدون قفل نموذج تناسق نهائي حيث يستخدم كل مؤشر ترابط طريقة عرض التجزئة الخاصة به إذا كان الرئيسي في حالة تغير مستمر. هناك خدعة صغيرة مطلوبة للتأكد من أن تخزين جميع المعلومات في الخيط لا يسري الذاكرة ، ولكن يتم التعامل معها واختبارها - لا ينمو حجم العملية بتشغيل هذا الرمز. يحتاج كل من التنفيذين إلى مزيد من العمل ليكون "مكتملاً" ، أي أن الحذف أو التحديث أو ما إلى ذلك قد يحتاج إلى بعض التفكير ، ولكن أي من المفهومين التاليين سيلبي متطلباتك.

من المهم للغاية بالنسبة للأشخاص الذين يقرؤون هذا الموضوع أن يدركوا أن القضية برمتها هي حصرية لـ JRuby - في MRI ، فإن التجزئة المدمجة كافية.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end

ما هي أفضل طريقة لتطبيق تجزئة يمكن تعديلها عبر سلاسل عمليات متعددة ، ولكن مع أصغر عدد من التأمينات. لأغراض هذا السؤال ، يمكنك أن تفترض أن التجزئة سوف تكون ثقيلة الوزن. يجب أن يكون خيطًا آمنًا في جميع تطبيقات Ruby ، ​​بما في ذلك تلك التي تعمل بطريقة متزامنة حقًا ، مثل JRuby ، ​​ويجب كتابتها في Ruby خالص (لا يُسمح بـ C أو Java).

لا تتردد في تقديم حل ساذج يتم قفله دائمًا ، ولكن ليس من المحتمل أن يكون هذا هو الحل الأفضل. نقاط للأناقة ، ولكن احتمالية أقل من تحقيق الفوز على رمز أصغر.


Answer #1

لم تختبر ، وطعنة ساذجة في الاستفادة المثلى للقراءة. يفترض أنه في معظم الأوقات ، لن يتم قفل القيمة. إذا كان الأمر كذلك ، فإن الحلقة الضيقة ستحاول حتى تكون. أضع Thread.critical هناك للمساعدة على ضمان تشغيل مؤشرات الترابط القراءة حتى اكتمال الكتابة. إذا لم تكن متأكدًا من الحاجة إلى الجزء المهم ، فستعتمد بالفعل على مدى قوتك الثقيلة ، لذا فإن بعض المعايير تكون في محلها.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

قد يكون هناك عدد قليل من طرق القراءة الأخرى التي تحتاج إلى التحقق من قفل قفل البريد @ ، لا أعرف ما إذا كان كل شيء آخر يتم تنفيذه من حيث # [].


Answer #2

يهودا ، أعتقد أنك ذكرت أن وضع ivar كان ذريًا؟ ماذا عن نسخة بسيطة ومبادلة بعد ذلك؟

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end

Answer #3

أنا غير واضح إلى حد ما على ما هو المقصود من هذا. أعتقد أن أبسط تطبيق هو ببساطة

Hash

وهذا يعني أن تجزئة روبي المضمنة هو threadafe إذا كان يعني عن طريق threadafe لن تفجير إذا كان يحاول 1> مؤشرات الترابط الوصول إليه. سيتم تشغيل هذا الرمز بأمان إلى الأبد

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

أظن أنه بخيط آمن ، فأنت تعني حقاً ACID - على سبيل المثال كتابة مثل hash [: key] =: val متبوعة بقراءة إذا كان [: key] سيعود: val. ولكن لا يمكن لأي قدر من الخداع مع القفل أن يوفر ذلك - الفوز الأخير سيكون دائمًا. على سبيل المثال ، لنفترض أن لديك 42 مؤشر ترابط كل تحديث تجزئة threadsafe - الذي يجب قراءة القيمة بواسطة 43'rd ؟؟ بالتأكيد عن طريق threasafe لا تعني نوعا من الطلب الكلي على الكتابة - لذا إذا كانت 42 خيوطًا كتابة فعالة فإن القيمة "الصحيحة" هي أي حق؟ لكن "روبي" المدمج في "هاش" يعمل بهذه الطريقة ...

ربما تقصد شيء من هذا القبيل

hash.each do ...

في موضوع واحد و

hash.delete(key)

لن تتداخل مع بعضها البعض؟ أستطيع أن أتخيل الرغبة في أن يكون سلفافي ، ولكن هذا ليس آمنا حتى في خيط واحد مع روبي التصوير بالرنين المغناطيسي (من الواضح أنه لا يمكنك تعديل تجزئة أثناء التكرار أكثر من ذلك)

لذا هل يمكنك أن تكون أكثر تحديدًا حول ما تعنيه بـ "الآمن"؟

الطريقة الوحيدة لإعطاء دلالات ACID ستكون قفل إجمالي (تأكد من أن هذا قد يكون طريقة أخذت كتلة - ولكن لا يزال قفل خارجي).

لا جدولة الخيط ruby ​​فقط لجدولة صفعة مؤشر ترابط في منتصف بعض الدالة c التعسفي (مثل أساليب asf hasf المضمنة التجزئة) بحيث تكون آمنة سلفي بشكل فعال.


Answer #4

لسوء الحظ ، لا يمكنني إضافة تعليق على إجابة مايكل سوارر حيث يقدم: فئة RWLock و Class LockedHash معreader_count إلخ. (ليس لديك ما يكفي من الكارما حتى الآن)

هذا الحل لا يعمل. يعطي خطأ: في "إلغاء القفل": محاولة فتح كائن المزامنة غير المؤمن (ThreadError)

نظرًا لوجود خطأ منطقي: عندما يحين وقت فتح الأشياء ، يحدث إلغاء القفل مرة واحدة إضافية (بسبب عدم التحقق من my_block؟ (). وبدلاً من ذلك يتم إلغاء حظره حتى إذا لم يكن إلغاء الحظر ضروريًا "هو الحظر") إلغاء كتم الصوت يثير استثناء. (سوف ألصق رمزًا كاملاً حول كيفية إعادة إنتاج هذا الخطأ في نهاية هذا المنشور).

كما ذكر مايكل "كل طريقة غير آمنة (تسمح بالطفرات بواسطة سلاسل رسائل أخرى أثناء التكرار)" التي كانت حاسمة بالنسبة لي ، لذلك انتهى بي الأمر مع هذا الحل المبسط الذي يعمل في جميع حالات الاستخدام الخاصة بي ، وببساطة يقوم بإقفال mutex في أي مكالمة إلى أي طريقة تجزئة عند استدعاء من مؤشر ترابط مختلف (لا يتم حظر المكالمات من نفس مؤشر الترابط الذي يملك القفل لتجنب deadlocks):

#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be 
# able to even read from it)
#
class TrulyThreadSafeHash
  def initialize
    @mutex = Mutex.new
    @hash = Hash.new
  end

  def method_missing(method_sym, *arguments, &block)

    if [email protected]?  # Returns true if this lock is currently held by current thread
        # We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
        # Following call will be blocking if mutex locked by other thread:
        @mutex.synchronize{
            return lambda{@hash.send(method_sym,*arguments, &block)}.call
        }
    end

    # We already own the lock (from current thread perspective).
    # We don't even check if @hash.respond_to?(method_sym), let's make Hash
    # respond properly on all calls (including bad calls (example: wrong method names))
    lambda{@hash.send(method_sym,*arguments, &block)}.call
  end

  # since we're tyring to mimic Hash we'll pretend to respond as Hash would
  def self.respond_to?(method_sym, include_private = false)
    Hash.respond_to(method_sym, include_private)
  end

  # override Object's to_s because our method_missing won't be called for to_s
  def to_s(*arguments)
      @mutex.synchronize{
        return @hash.to_s
      }
  end

  # And for those, who want to run extra mile:
  # to make our class json-friendly we shoud require 'json' and uncomment this:
  #def to_json(*options)
  #    @mutex.synchronize{
  #        return @hash.to_json(*options)
  #    }
  #end

end

والآن المثال الكامل لشرح / إعادة إنتاج خطأ إلغاء القفل المزدوج في حل Michael Sofaer:

#!/usr/bin/env ruby

# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end



class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======


# global hash object, which will be 'shared' across threads
$h = LockedHash.new

# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
    loop{
        count = 0
        $h.each{
            count += 1
            if count != $h.size
                $stderr.print delim
            else
                $stderr.puts delim.upcase
            end
        }
    }
end

# fill hash with 10 items
10.times{|i|
    $h[i] = i
}

# create a thread which will read $h hash
t1 = Thread.new(){
    hash_reader("o")
}

t1.join  # will never happen, but for completeness

والذي يعطي الخطأ التالي:

./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
        from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
        from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
        from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
        from ./LockedHash_fails_to_unlock.rb:98:in `loop'
        from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
        from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'

Answer #5

منذ أن ذكرت أن التجزئة سوف تُقرأ ثقيلًا ، فإن وجود كائن واحد يقرأ كل من القراءة والكتابة سيؤدي إلى ظروف سباق من المحتمل أن تكون قد فازت بالقراءات. إذا كان ذلك مناسبًا لك ، فتجاهل الإجابة.

إذا كنت ترغب في إعطاء الأولوية للكتابة ، فسيساعدك قفل القراءة والكتابة. تعتمد التعليمة البرمجية التالية على بعض تعيينات c ++ القديمة لفئة أنظمة التشغيل ، لذا قد لا تكون أفضل جودة ، ولكنها تعطي فكرة عامة.

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

ثم فقط التفاف [] = و [] في lock.write و lock.read. قد يكون لها تأثير على الأداء ، ولكن ستضمن أن الكتابة سوف "تمر عبر" القراءات. تعتمد فائدة ذلك على مدى ثقله في الواقع.


Answer #6

قد يكون هذا حالة استخدام للأحجار الكريمة الهمستر

يقوم الهامستر بتنفيذ نظام Hash Array Array Mapped Tries (HAMT) ، بالإضافة إلى بعض هياكل البيانات الثابتة ، في روبي النقي.

تكون هياكل البيانات الثابتة غير قابلة للتغيير ، وبدلاً من تغيير (تغيير) البنية ، مثل إضافة أو استبدال زوج من القيمة الرئيسية في Hash ، تقوم بدلاً من ذلك بإعادة هيكل بيانات جديد يحتوي على التغيير. الخدعة ، مع هياكل البيانات الثابتة غير القابلة للتغيير ، هي أن بنية البيانات التي تم إرجاعها حديثا تعيد استخدام أكبر قدر من السلفيات قدر الإمكان.

أعتقد أن تنفيذ استخدام الهامستر ، يمكنك استخدام المجمع التجزئة muthot ، الذي يمر جميع القراءات إلى القيمة الحالية للتجزئة الثابتة غير القابلة للتغيير (أي يجب أن تكون سريعة) ، في حين يحرس كل يكتب مع كائن المزامنة ، والتبديل إلى القيمة الجديدة من تجزئة الثابتة الثابتة بعد الكتابة.

فمثلا:

require 'hamster'
require 'hamster/experimental/mutable_hash'    
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)

# reading goes directly to hash
puts hsh[:name] # Simon

# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe

لذا ، دعنا نستخدم هذا لنوع مماثل من المشاكل إلى المشكلة الموضحة:

( جست هنا )

require 'hamster'
require 'hamster/experimental/mutable_hash'

# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100 
hsh = Hamster.mutable_hash

puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"

t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
  Thread.new do
    write_key = n % num_reads_per_write
    read_keys = (0...num_reads_per_write).to_a.shuffle # random order
    last_read = nil

    num_loops.times do
      read_keys.each do |k|
        # Reads
        last_read = hsh[k]

        Thread.pass

        # Atomic increments in the correct ratio to reads
        hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
      end
    end
  end
end

threads.map { |t| t.join }
t1 = Time.now

puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"

أتلقى المخرجات التالية:

ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s

jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s

ما رأيك بهذا؟

هذا الحل هو أكثر شبهاً بكيفية حل هذا في Scala أو Clojure ، على الرغم من أنه في هذه اللغات ، فمن المحتمل أن يستخدم أحد هذه البرامج ذاكرة معاملات برمجيات مع دعم وحدة المعالجة المركزية منخفض المستوى لعمليات المقارنة والتبادل الذري التي يتم تنفيذها.

تحرير : من الجدير بالذكر أن أحد الأسباب وراء سرعة تنفيذ الهمستر هو أنه يحتوي على مسار قراءة خالية من القفل . يرجى الرد في التعليقات إذا كانت لديك أسئلة حول ذلك أو كيف يعمل.


Answer #7

نشر قاعدة / سذاجة الحل ، فقط لتعزيز بلدي :

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end




locking