Рейтинг@Mail.ru

1.10. Submodule fiber-ipc

1.10. Submodule fiber-ipc

The fiber-ipc submodule allows sending and receiving messages between different processes and has a synchronization mechanism for fibers, similar to “Condition Variables” and similar to operating-system functions such as pthread_cond_wait() plus pthread_cond_signal(). The words “different processes” in this context mean different connections, different sessions, or different fibers.

Call fiber.channel() to allocate space and get a channel object, which will be called channel for examples in this section. Call the other fiber-ipc routines, via channel, to send messages, receive messages, or check ipc status. Message exchange is synchronous. The channel is garbage collected when no one is using it, as with any other Lua object. Use object-oriented syntax, for example channel:put(message) rather than fiber.channel.put(message).

Call fiber.cond() to create a named condition variable, which will be called cond for examples in this section. Call cond:wait() to make a fiber wait for a signal via a condition variable. Call cond:signal() to send a signal to wake up a single fiber that has executed cond:wait(). Call cond:broadcast() to send a signal to all fibers that have executed cond:wait().

1.10.1. Channel

fiber.channel([capacity])

Create a new communication channel.

Parameters:
  • capacity (int) – the maximum number of slots (spaces for channel:put messages) that can be in use at once. The default is 0.
Return:

new channel.

Rtype:

userdata, possibly including the string “channel ...”.

object channel_object
channel_object:put(message[, timeout])

Send a message using a channel. If the channel is full, channel:put() waits until there is a free slot in the channel.

Parameters:
  • message (lua-value) – what will be sent, usually a string or number or table
  • timeout (number) – maximum number of seconds to wait for a slot to become free
Return:

If timeout is specified, and there is no free slot in the channel for the duration of the timeout, then the return value is false. If the channel is closed, then the return value is false. Otherwise, the return value is true, indicating success.

Rtype:

boolean

channel_object:close()

Close the channel. All waiters in the channel will stop waiting. All following channel:get() operations will return nil, and all following channel:put() operations will return false.

channel_object:get([timeout])

Fetch and remove a message from a channel. If the channel is empty, channel:get() waits for a message.

Parameters:
  • timeout (number) – maximum number of seconds to wait for a message
Return:

If timeout is specified, and there is no message in the channel for the duration of the timeout, then the return value is nil. If the channel is closed, then the return value is nil. Otherwise, the return value is the message placed on the channel by channel:put().

Rtype:

usually string or number or table, as determined by channel:put

channel_object:is_empty()

Check whether the channel is empty (has no messages).

Return:true if the channel is empty. Otherwise false.
Rtype:boolean
channel_object:count()

Find out how many messages are in the channel.

Return:the number of messages.
Rtype:number
channel_object:is_full()

Check whether the channel is full.

Return:true if the channel is full (the number of messages in the channel equals the number of slots so there is no room for a new message). Otherwise false.
Rtype:boolean
channel_object:has_readers()

Check whether readers are waiting for a message because they have issued channel:get() and the channel is empty.

Return:true if readers are waiting. Otherwise false.
Rtype:boolean
channel_object:has_writers()

Check whether writers are waiting because they have issued channel:put() and the channel is full.

Return:true if writers are waiting. Otherwise false.
Rtype:boolean
channel_object:is_closed()
Return:true if the channel is already closed. Otherwise false.
Rtype:boolean

1.10.1.1. Example

This example should give a rough idea of what some functions for fibers should look like. It’s assumed that the functions would be referenced in fiber.create().

fiber = require('fiber')
channel = fiber.channel(10)
function consumer_fiber()
    while true do
        local task = channel:get()
        ...
    end
end

function consumer2_fiber()
    while true do
        -- 10 seconds
        local task = channel:get(10)
        if task ~= nil then
            ...
        else
            -- timeout
        end
    end
end

function producer_fiber()
    while true do
        task = box.space...:select{...}
        ...
        if channel:is_empty() then
            -- channel is empty
        end

        if channel:is_full() then
            -- channel is full
        end

        ...
        if channel:has_readers() then
            -- there are some fibers
            -- that are waiting for data
        end
        ...

        if channel:has_writers() then
            -- there are some fibers
            -- that are waiting for readers
        end
        channel:put(task)
    end
end

function producer2_fiber()
    while true do
        task = box.space...select{...}
        -- 10 seconds
        if channel:put(task, 10) then
            ...
        else
            -- timeout
        end
    end
end

1.10.2. Condition variables

fiber.cond()

Create a new condition variable.

Return:new condition variable.
Rtype:Lua object
object cond_object
cond_object:wait([timeout])

Make the current fiber go to sleep, waiting until until another fiber invokes the signal() or broadcast() method on the cond object. The sleep causes an implicit fiber.yield().

Parameters:
  • timeout – number of seconds to wait, default = forever.
Return:

If timeout is provided, and a signal doesn’t happen for the duration of the timeout, wait() returns false. If a signal or broadcast happens, wait() returns true.

Rtype:

boolean

cond_object:signal()

Wake up a single fiber that has executed wait() for the same variable.

Rtype:nil
cond_object:broadcast()

Wake up all fibers that have executed wait() for the same variable.

Rtype:nil

1.10.2.1. Example

Assume that a tarantool server is running and listening for connections on localhost port 3301. Assume that guest users have privileges to connect. We will use the tarantoolctl utility to start two clients.

On terminal #1, say

$ tarantoolctl connect '3301'
tarantool> fiber = require('fiber')
tarantool> cond = fiber.cond()
tarantool> cond:wait()

The job will hang because cond:wait() – without an optional timeout argument – will go to sleep until the condition variable changes.

On terminal #2, say

$ tarantoolctl connect '3301'
tarantool> cond:signal()

Now look again at terminal #1. It will show that the waiting stopped, and the cond:wait() function returned true.

This example depended on the use of a global conditional variable with the arbitrary name cond. In real life, programmers would make sure to use different conditional variable names for different applications.