|Portada|Blog|Wiki|
image

In this post we are going to be talking about Copas, a library that serves
the purpose of providing collaborative concurrency by integrating co-routines
with networking libraries such as luasocket and luasec.  This is the most
common library for asynchronous processing under Lua.

One issue that has been particularly interesting to me is that up to Copas 2,
the library supported a high level interface called "limitset" that wraped
around copas.addthread with the goal of limiting the maximum number of such
co-routines that would be running at a given time.  Copas 3 no longer provides
such an API, and here we will be trying to build it on top of top of the newer
copas versions.

Let's start by introducing the API:

copas.limit.new(max)
   Creates and returns a `limitset` that limits the concurrent tasks to `max`
   number of running tasks.  Eg. 100 http requests, in a set with `max ==
   10`, then no more than 10 requests will be performed simultaneously.
   Only when a request finishes, the next will be started.

limitset:addthread(func [, ...])
   Identical to `copas.addthread`, except that it operates within the limits
   of the set of running tasks.

   Add a task to the queue, returns the coroutine created
   identical to `copas.addthread`. Can be called while the 
   set of tasks is executing.

limitset:wait()
   Will yield until all tasks in the set have finished.

image

Simple, enough let's then write a simple implementation using the `semaphore`
API provided by copas 3.  Hint: try to discover issues in this naïve first
version:


local limitset = {}
limitset.__index = limitset

local function limitset_new(max)
  return setmetatable({max=max, sem=copas.semaphore.new(max, max)}, limitset)
end

function limitset:addthread(func, ...)
  return copas.addthread(function(...)
    self.sem:take()
    func(...)
    self.sem:give()
  end, ...)
end

function limitset:wait()
  self.sem:take(self.max)
  self.sem:give(self.max)
end


The first issue (and the easiest one to see) is that if a task fails, we would
never reach the self.sem:give() resulting on the wait() method not unblocking
any task waiting on the semaphore.  This first issue can be solved by wrapping
the func call with `pcall`: pcall(func, ...), re-throwing the error after the
semaphore has been released.

Another issue which is certainly harder to see is that if we do:
   local mylimitset = copas.limit.new(10)
   mylimitset:addthread(function() --[[...]] end)
   mylimitset:wait()

There's no guarantee that the co-routine created at `mylimitset:addthread`
would be started before the the call to the :wait() method starts.  Meaning
that we could end up with the wait method completing before the created thread
does the initial `self.sem:take()`.

Even if it were, the order in which the semaphore prioritizes requests is not
specified in the documentation, while the source code shows that the behavior
is to grant them in FIFO order, adding the requests to its queue on the take
method.  This would mean that once we call self.sem:take(self.max) during
mylimitset:wait, all new threads added after that will have to wait until all
the previous thread in the limitset have completed.

image

-- "recipe" called from copas:
local pcall = pcall
if _VERSION == "Lua 5.1" and require "coxpcall" then
  pcall = require "coxpcall".pcall
end

local limitset = {}
limitset.__index = limitset

local function limitset_new(max)
   assert(type(max) == 'number' and math.floor(max) == max and max > 0,
      'parameter max has to be a positive integer')
   return setmetatable({
      count=0,
      waiting={},
      sem=copas.semaphore.new(max, max),
   }, limitset)
end

function limitset:addthread(func, ...)
   self.count = self.count + 1

   return copas.addthread(function(...)
      self.sem:take()
      local ok, errobj = pcall(func, ...)
      self.sem:give()
      self.count = self.count - 1

      while self.count == 0 do
         local coro = next(self.waiting)
         if not coro then break end
         self.waiting[coro] = nil
         -- copas.wakeup does not switch context, but just in case we re-check
         -- self.pending in case a woken up co-routine added more threads to
         -- this limitset in the middle.
         copas.wakeup(coro)
      end

      -- Yes, the stack trace is lost, but if you want to maintain it you can
      -- always wrap the function with xpcall(func, your_own_handler): btw,
      -- you don't need to create a partial function to do it, as this call
      -- can also be used: mylimitset:addthread(xpcall, myfunc, myhandler, ...)
      -- (extra args only work on luajit or lua>=5.2).
      --    This was the same behavior as with ye-olde copas limitset.
      if errobj then error(errobj) end
   end, ...)
end

function limitset:wait()
   -- there's no need to pause if there are no running co-routines:
   if self.count == 0 then return end
   local co = coroutine.running()
   assert(co, 'cannot wait from main thread')
   self.waiting[co] = true
   copas.pauseforever()
end



If you are curious, this was the last official version for limitsets just
before it was deleted:

https://github.com/lunarmodules/copas/blob/v2_0_2/src/copas/limit.lua

In the end we have seen it's not trivial how to re-implement this structure,
and so here is my attempt to write one of the missing chunks of code that
would enable backwards compatibility with older copas versions.