If you need to process 1000 items independently, the simplest thing to do is to handle them one-by-one. But what if there is a large delay that is not caused by your application when you handle an item? What if you need to make a 100ms web request per item, only to do about 10ms of processing on that item? This would take 1000 x 110ms (100ms to wait, 10ms to process)!
So what do we do?
You could multithread the application, and the simple way is to say:
items.each do |i|
Thread.new(i) do |i|
handle(i)
end
end
The problem is that this will spawn 1000 threads, each of which will go out simultaneously, and after 100ms when they come back your system will be overloaded.
Ideally, we would like to pipeline the processing to that each ms of execution we are working on one thread, so our cpu is used to the max as well as our internet connection. In this ideal situation, the task would take 10000ms (10ms per item) + 100ms (time to wait for first to come back).
So a little calculation lets us know that we want 10 threads working at once, each 10ms apart from each other. Now we're going to get into thread pooling! Actually, I am going to be working with forking, which is more memory intensive but it can actually use multiple cores, so it's more useful. Using enterprise ruby, a lot of that memory is shared, so it's not so bad.
# here we keep our items
items_to_process = my_items
# max at once
max_process = 10
# keep track of the processes running
processing_pids = {}
# while we have items to work on and we aren't done with the
# ones we're working on
while !items_to_process.empty? or !processing_pids.empty? do
# If we are below our max number of running items,
# add a thread
while items_to_process.size < max_process do
item_to_process = items_to_process.pop
# keep the connection across the forks
ActiveRecord::Base.remove_connection
pid_starting = Process.fork{
ActiveRecord::Base.establish_connection
process_item(item_to_process)
ActiveRecord::Base.remove_connection
}
ActiveRecord::Base.establish_connection
# keep track of the item by pid.
#You could put other stuff in this hash too
processing_pids[pid_starting] = {:item => item_to_process}
end
# pick up a finished task. This will get one task whose
# parent is the current thread.
# If it doesn't get one immediately, it returns nil and loops
if pid_done = Process.wait(0, Process::WNOHANG)
# grab it out of the hash
if job_finished = processing_pids.delete(pid_done)
puts "We finished item #{job_finished[:item].to_s}"
end
end
# quick nap so we don't hit this loop a lot
sleep(0.001)
end
The basic idea here is to keep a pool of forks running, and each 1ms we'll check to see if any finished and need to be started. When a fork finishes we can do any post-processing, and the next time around the loop a new one will be added.
Want to learn more from SmartLogic, a ruby on rails focused development firm, follow us on Twitter.