fairy.coffee | |
|---|---|
Queue System Treats Tasks Fairly. Fairy is a lightweight queue engine for node.js based on Redis. Fairy offers ActiveMQ's message groups alike feature which can guarantee the sequential processing order of tasks belong to a same group. But, unlike message groups, Fairy doesn't always route tasks of a group to a same worker, which will introduce unwanted waiting time when:
Fairy will route the task of group Fairy takes a different approach than Message Groups. Instead of making all tasks of a same group be routed to the same consumer, Fairy route a task to any worker when there's no processing tasks of the same group. The design philosophy makes Fairy ideal for the following requirements:
Copyright © 2012, Baoshan Sheng. Released under the MIT License. | |
Fairy in a Nutshell | |
Fairy depends on:
| uuid = require 'node-uuid'
redis = require 'redis'
os = require 'os' |
A constant prefix will be applied to all Redis keys for safety and ease-of-management reasons. | prefix = 'FAIRY' |
CommonJS Module Definition | |
The only exposed object is a
The
| exports.connect = (options = {}) ->
new Fairy options |
Exception / Interruption Handling | |
Use | |
Module wide variable to instruct all queues exit after processing current task. | exiting = off |
Keep current process's all registered workers an array, rely on this array to count cleaned workers on exiting. | registered_workers = [] |
Log active workers while waiting for all workers to clean-up. | log_registered_workers = ->
console.log "\nFairy is waiting for #{registered_workers.length} workers to clean-up before exit:"
for registered_worker in registered_workers
worker_info = registered_worker.split '|'
console.log " * Client Id: [#{worker_info[0]}], Task: [#{worker_info[1]}]"
cleanup_required = off |
Fairy will enter cleanup mode before exit when:
If there's no registered workers, exit directly. | enter_cleanup_mode = ->
if registered_workers.length
log_registered_workers()
cleanup_required = on
exiting = on
else
return process.exit() |
When below signals are captured, gracefully exit the program by notifying all workers entering cleanup mode and exit after all are cleaned up.
| process.on 'SIGINT', enter_cleanup_mode
process.on 'SIGHUP', enter_cleanup_mode
process.on 'SIGQUIT', enter_cleanup_mode
process.on 'SIGUSR1', enter_cleanup_mode
process.on 'SIGUSR2', enter_cleanup_mode
process.on 'SIGTERM', enter_cleanup_mode
process.on 'SIGABRT', enter_cleanup_mode |
When | process.on 'uncaughtException', (err) ->
console.log 'Exception:', err.stack
console.log 'Fairy workers will block their processing groups before exit.' if registered_workers.length
enter_cleanup_mode() |
Say goodbye on exit. | process.on 'exit', ->
console.log "Fairy cleaned up, exiting..." if cleanup_required |
Helper Methods | |
Get Public IPFairy embed public IP address of workers' environment in workers' name to facilitate management. | server_ip = ->
for card, addresses of os.networkInterfaces()
for address in addresses
return address.address if not address.internal and address.family is 'IPv4'
return 'UNKNOWN_IP' |
Create Redis Client | create_client = (options) ->
client = redis.createClient options.port, options.host, options.options
client.auth options.password if options.password?
client |
Class Fairy | |
Model wide variable used for allocating an increasing integer | fairy_id = 0 |
Object of class | class Fairy |
ConstructorClass | |
The constructor of class A | constructor: (@options) ->
@redis = create_client options
@id = fairy_id++
@queue_pool = {} |
Function to Resolve Key Name | |
Private method to generate prefixed keys. Keys used by objects of class
| key: (key) -> "#{prefix}:#{key}" |
Get a Named Queue | |
If the named queue can be found in the | queue: (name) ->
return @queue_pool[name] if @queue_pool[name]
@redis.sadd @key('QUEUES'), name
@queue_pool[name] = new Queue @, name |
Get All Queues Asynchronously | |
Return named queues whose names are stored in the | queues: (callback) =>
@redis.smembers @key('QUEUES'), (err, res) =>
return callback err if err
callback null, res.map (name) => @queue name |
Get Statistics for All Queues Asynchronously | |
| statistics: (callback) =>
@queues (err, queues) ->
return callback err if err
return callback null, [] unless total_queues = queues.length
result = []
for queue, i in queues
do (queue, i) ->
queue.statistics (err, statistics) ->
return callback err if err
result[i] = statistics
callback null, result if callback unless --total_queues |
Class Queue | |
Objects of class
Class | class Queue |
Constructor | |
The constructor of class | constructor: (@fairy, @name) ->
@redis = fairy.redis |
Function to Resolve Key Name | |
Private method to generate (
| key: (key) -> "#{prefix}:#{key}:#{@name}" |
Configurable ParametersPrototypal inherited parameters which can be overriden by instance properties include: | |
| polling_interval : 5
retry_limit : 2
retry_delay : 0.1 * 1000
recent_size : 10
slowest_size : 10
|
Placing Tasks | |
Tasks will be pushed into
Usage:
A transaction ensures the atomicity. | enqueue: (args..., callback) =>
if typeof callback isnt 'function'
args.push callback
callback = undefined
@redis.multi()
.rpush(@key('SOURCE'), JSON.stringify([uuid.v4(), args..., Date.now()]))
.sadd(@key('GROUPS'), args[0])
.hincrby(@key('STATISTICS'), 'TOTAL', 1)
.exec(callback) |
Register Handler | |
When registered a processing handler function, the queue becomes a worker automatically: Fairy will immediately start polling tasks and process them on present. When becomes a worker, Fairy will regist an uuid (v4) key in the
| regist: (@handler) =>
registered_workers.push "#{@fairy.id}|#{@name}"
worker_id = uuid.v4()
@redis.hset @key('WORKERS'), worker_id, "#{os.hostname()}|#{server_ip()}|#{process.pid}|#{Date.now()}"
process.on 'uncaughtException', (err) =>
if @_handler_callback
console.log "Worker [#{worker_id.split('-')[0]}] registered for Task [#{@name}] will block its current processing group"
@_handler_callback {do: 'block', message: err.stack}
else
@_try_exit()
process.on 'exit', => @redis.hdel @key('WORKERS'), worker_id
@_poll() |
Poll New Task | |
Private method. If any task presents in the Since the task being popped and pushed should be known in prior of the
begin of the transaction (aka, the If there's no pending tasks of the same group, then process the task immediately. If there's no tasks in the | _poll: =>
return @_try_exit() if exiting
@redis.watch @key('SOURCE')
@redis.lindex @key('SOURCE'), 0, (err, res) =>
if res
task = JSON.parse res
@redis.multi()
.lpop(@key('SOURCE'))
.rpush("#{@key('QUEUED')}:#{task[1]}", res)
.exec (multi_err, multi_res) =>
return @_poll() unless multi_res and multi_res[1] is 1
@_process task
else
@redis.unwatch()
setTimeout @_poll, @polling_interval |
Exit When All Queues are Cleaned Up | |
Private method. Wait if there're queues still working, or exit the process immediately. | _try_exit: =>
registered_workers.splice registered_workers.indexOf "#{@fairy.id}|#{@name}", 1
process.exit() unless registered_workers.length
log_registered_workers() |
Process Each Group's First Task | |
Private method. The real job is done by the passed in
Calling the callback function is the responsibility of you. Otherwise
| _process: (task) =>
@redis.hset @key('PROCESSING'), task[0], JSON.stringify([task..., start_time = Date.now()]) |
Before Processing the Task:
| processing = task[0]
retry_count = @retry_limit
errors = []
@_handler_callback = handler_callback = (err, res) =>
@_handler_callback = null |
Error handling routine:
| if err
errors.push err.message or null
switch err.do
when 'block'
@redis.multi()
.rpush(@key('FAILED'), JSON.stringify([task..., Date.now(), errors]))
.hdel(@key('PROCESSING'), processing)
.sadd(@key('BLOCKED'), task[1])
.exec()
return @_poll()
when 'block-after-retry'
return setTimeout call_handler, @retry_delay if retry_count--
@redis.multi()
.rpush(@key('FAILED'), JSON.stringify([task..., Date.now(), errors]))
.hdel(@key('PROCESSING'), processing)
.sadd(@key('BLOCKED'), task[1])
.exec()
return @_poll()
else
return setTimeout call_handler, @retry_delay if retry_count--
@redis.multi()
.rpush(@key('FAILED'), JSON.stringify([task..., Date.now(), errors]))
.hdel(@key('PROCESSING'), processing)
.exec() |
Success handling routine:
| else
finish_time = Date.now()
process_time = finish_time - start_time
@redis.multi()
.hdel(@key('PROCESSING'), processing)
.hincrby(@key('STATISTICS'), 'FINISHED', 1)
.hincrby(@key('STATISTICS'), 'TOTAL_PENDING_TIME', start_time - task[task.length - 1])
.hincrby(@key('STATISTICS'), 'TOTAL_PROCESS_TIME', process_time)
.lpush(@key('RECENT'), JSON.stringify([task..., finish_time]))
.ltrim(@key('RECENT'), 0, @recent_size - 1)
.zadd(@key('SLOWEST'), process_time, JSON.stringify([task..., start_time]))
.zremrangebyrank(@key('SLOWEST'), 0, - @slowest_size - 1)
.exec()
@_continue_group task[1]
do call_handler = =>
@handler task[1...-1]..., (@_handler_callback = handler_callback) |
Continue Process a Group | |
Private method. Upon successful execution of a task, or skipping a failed task:
Above commands are protected by a transaction to prevent multiple workers processing a same task. | _continue_group: (group) =>
@redis.watch "#{@key('QUEUED')}:#{group}"
@redis.lindex "#{@key('QUEUED')}:#{group}", 1, (err, res) =>
if res
task = JSON.parse res
@redis.unwatch()
@redis.lpop "#{@key('QUEUED')}:#{group}"
return @_requeue_group group if exiting
@_process task
else
@redis.multi()
.lpop("#{@key('QUEUED')}:#{group}")
.exec (multi_err, multi_res) =>
return @_continue_group group unless multi_res
return @_try_exit() if exiting
@_poll()
|
Requeue Tasks on Exit | |
Private method. Before exiting, requeue all tasks in the processing
To ensure the correct order of tasks, When tasks are requeued successfully, | _requeue_group: (group) =>
@redis.watch "#{@key('QUEUED')}:#{group}"
@redis.lrange "#{@key('QUEUED')}:#{group}", 0, -1, (err, res) =>
@redis.multi()
.lpush("#{@key('SOURCE')}", res.reverse()...)
.del("#{@key('QUEUED')}:#{group}")
.exec (multi_err, multi_res) =>
return @_requeue_group group unless multi_res
return @_try_exit() |
Re-Schedule Failed and Blocked Tasks | |
Requeue the failed and blocked tasks into
Above commands should be protected by a transaction. Usage: | reschedule: (callback) =>
client = create_client @fairy.options
do reschedule = => |
Make sure | client.watch @key('FAILED')
client.watch @key('SOURCE')
client.watch @key('BLOCKED')
client.watch @key('PROCESSING')
client.hlen @key('PROCESSING'), (err, res) =>
if res
client.unwatch()
return reschedule() |
Push all failed tasks (without last two parameters: error message and failure time) into a temporary task array storing tasks to be rescheduled. Then, get all blocked groups. | @failed_tasks (err, tasks) =>
requeued_tasks = []
requeued_tasks.push tasks.map((task) -> JSON.stringify [task.id, task.params..., task.queued.valueOf()])...
@blocked_groups (err, groups) => |
Make sure all blocked
Commit the transaction, re-initiate the transaction when concurrency occurred, otherwise the reschedule is finished. | client.watch groups.map((group) => "#{@key('QUEUED')}:#{group}")... if groups.length
start_transaction = =>
multi = client.multi()
multi.lpush @key('SOURCE'), requeued_tasks.reverse()... if requeued_tasks.length
multi.del @key 'FAILED'
multi.del groups.map((group) => "#{@key('QUEUED')}:#{group}")... if groups.length
multi.del @key 'BLOCKED'
multi.exec (multi_err, multi_res) =>
if multi_err
client.quit()
return callback multi_err
if multi_res
client.quit()
@statistics callback
else
reschedule callback |
If there're blocked task groups, then:
Otherwise, start the transaction immediately. | if total_groups = groups.length
for group in groups
client.lrange "#{@key('QUEUED')}:#{group}", 1, -1, (err, res) =>
requeued_tasks.push res...
start_transaction() unless --total_groups
else start_transaction() |
Get Recently Finished Tasks Asynchronously | |
Recently finished tasks are tasks stored in the
Below is an example
Usage: | recently_finished_tasks: (callback) =>
@redis.lrange @key('RECENT'), 0, -1, (err, res) ->
return callback err if err
callback null, res.map (entry) ->
entry = JSON.parse entry
id: entry[0]
params: entry[1..-3]
finished: new Date entry.pop()
queued: new Date entry.pop() |
Get Failed Tasks Asynchronously | |
Failed tasks are stored in the
Below is an example
Usage: | failed_tasks: (callback) =>
@redis.lrange @key('FAILED'), 0, -1, (err, res) ->
return callback err if err
callback null, res.map (entry) ->
entry = JSON.parse entry
id: entry[0]
params: entry[1..-4]
reason: entry.pop()
failed: new Date entry.pop()
queued: new Date entry.pop() |
Get Blocked Groups Asynchronously | |
Blocked groups' identifiers are stored in the
Below is an example
Usage: | blocked_groups: (callback) ->
@redis.smembers @key('BLOCKED'), (err, res) ->
return callback err if err
callback null, res.map (entry) ->
entry = JSON.parse entry |
Get Slowest Tasks Asynchronously | |
Slowest tasks are tasks stored in the
Below is an example
Usage:
| slowest_tasks: (callback) ->
@redis.zrevrange @key('SLOWEST'), 0, -1, "WITHSCORES", (err, res) ->
return callback err if err
res = res.map (entry) -> JSON.parse entry
callback null, ([res[i]...,res[i + 1]] for i in [0...res.length] by 2).map (entry) ->
id: entry[0]
params: entry[1..-4]
time: entry.pop()
started: new Date entry.pop()
queued: new Date entry.pop() |
Get Processing Tasks Asynchronously | |
Currently processing tasks are tasks in the
Below is an example
Usage: | processing_tasks: (callback) ->
@redis.hvals @key('PROCESSING'), (err, res) ->
return callback err if err
callback null, res.map (entry) ->
entry = JSON.parse(entry)
id: entry[0]
params: entry[1..-3]
start: new Date entry.pop()
queued: new Date entry.pop() |
Get Source Tasks Asynchronously | |
Get tasks in the Accepted parameters are:
Below is an example
Possible combos of arguments are:
Usage: | source_tasks: (args..., callback) ->
skip = args[0] or 0
take = args[1] or 10
@redis.lrange @key('SOURCE'), skip, skip + take - 1, (err, res) ->
callback err if err
callback null, res.map (entry) ->
entry = JSON.parse entry
id: entry[0]
params: entry[1..-2]
queued: new Date entry.pop() |
Get Workers Asynchronously | |
Asynchronous method to get all live workers of the queue. Live
workers are registered in the Arguments of the callback function follow node.js error handling convention:
Below is an example of returned workers:
Usage: | workers: (callback) =>
@redis.hvals @key('WORKERS'), (err, res) ->
return callback err if err
callback null, res.map((entry) ->
entry = entry.split '|'
host: entry[0]
ip: entry[1]
pid: parseInt entry[2]
since: new Date parseInt entry[3]
).sort (a, b) ->
return 1 if a.ip > b.ip
return -1 if a.ip < b.ip
return 1 if a.pid > b.pid
return -1 if a.pid < b.pid |
Clear A Queue | |
Remove all tasks of the queue, and reset statistics. Set | clear: (callback) =>
@redis.watch @key('SOURCE')
@redis.watch @key('PROCESSING')
@redis.hlen @key('PROCESSING'), (err, processing) =>
return callback? err if err
@redis.keys "#{@key('QUEUED')}:*", (err, res) =>
return callback? err if err
@redis.multi()
.del(@key('GROUPS'), @key('RECENT'), @key('FAILED'), @key('SOURCE'), @key('STATISTICS'), @key('SLOWEST'), @key('BLOCKED'), res...)
.hmset(@key('STATISTICS'), 'TOTAL', processing, 'FINISHED', 0, 'TOTAL_PENDING_TIME', 0, 'TOTAL_PROCESS_TIME', 0)
.exec (err, res) =>
return callback? err if err
return @clear callback unless res
@statistics callback if callback |
Get Statistics of a Queue Asynchronously | |
Statistics of a queue include:
Below is an example of the
If there're no finished tasks, Usage: | statistics: (callback) -> |
Start a transaction, in the transaction:
| @redis.multi()
.scard(@key('GROUPS'))
.hgetall(@key('STATISTICS'))
.hlen(@key('PROCESSING'))
.llen(@key('FAILED'))
.smembers(@key('BLOCKED'))
.hlen(@key('WORKERS'))
.exec (multi_err, multi_res) =>
return callback multi_err if multi_err |
Process the result of the transaction.
| statistics = multi_res[1] or {}
result =
name: @name
total:
groups: multi_res[0]
tasks: parseInt(statistics.TOTAL) or 0
finished_tasks: parseInt(statistics.FINISHED) or 0
average_pending_time: Math.round(statistics.TOTAL_PENDING_TIME * 100 / statistics.FINISHED) / 100
average_process_time: Math.round(statistics.TOTAL_PROCESS_TIME * 100 / statistics.FINISHED) / 100
blocked:
groups: multi_res[4].length
processing_tasks: multi_res[2]
failed_tasks: multi_res[3]
workers: multi_res[5]
if result.finished_tasks is 0
result.average_pending_time = '-'
result.average_process_time = '-' |
Calculate blocked and pending tasks:
The equation used to calculate pending tasks is: | multi2 = @redis.multi()
multi2.llen "#{@key('QUEUED')}:#{group}" for group in multi_res[4]
multi2.exec (multi2_err, multi2_res) ->
return callback multi2_err if multi2_err
result.blocked.tasks = multi2_res.reduce(((a, b) -> a + b), - result.blocked.groups)
result.pending_tasks = result.total.tasks - result.finished_tasks - result.processing_tasks - result.failed_tasks - result.blocked.tasks
callback null, result |
Known Bugs: |