内容

名称

Thread::Queue - 线程安全队列

版本

本文档描述了 Thread::Queue 版本 3.14

概要

use strict;
use warnings;

use threads;
use Thread::Queue;

my $q = Thread::Queue->new();    # A new empty queue

# Worker thread
my $thr = threads->create(
    sub {
        # Thread will loop until no more work
        while (defined(my $item = $q->dequeue())) {
            # Do work on $item
            ...
        }
    }
);

# Send work to the thread
$q->enqueue($item1, ...);
# Signal that there is no more work to be sent
$q->end();
# Join up with the thread when it finishes
$thr->join();

...

# Count of items in the queue
my $left = $q->pending();

# Non-blocking dequeue
if (defined(my $item = $q->dequeue_nb())) {
    # Work on $item
}

# Blocking dequeue with 5-second timeout
if (defined(my $item = $q->dequeue_timed(5))) {
    # Work on $item
}

# Set a size for a queue
$q->limit = 5;

# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);

# Insert two items into the queue just behind the head
$q->insert(1, $item1, $item2);

# Extract the last two items on the queue
my ($item1, $item2) = $q->extract(-2, 2);

描述

此模块提供线程安全的 FIFO 队列,可以被任意数量的线程安全地访问。

任何由 threads::shared 支持的数据类型都可以通过队列传递

普通标量
数组引用
哈希引用
标量引用
基于上述的的对象

普通标量按原样添加到队列中。

如果数据类型尚未共享线程,则其他复杂数据类型将在放入队列之前被克隆(如果需要,递归克隆,包括任何bless和只读设置)到线程共享结构中。

例如,以下操作将导致Thread::Queue通过&shared([])创建一个空的共享数组引用,将@ary中的元素'foo'、'bar'和'baz'复制到其中,然后将该共享引用放入队列。

my @ary = qw/foo bar baz/;
$q->enqueue(\@ary);

但是,对于以下情况,这些项目已经是共享的,因此它们的引用直接添加到队列中,不会进行克隆。

my @ary :shared = qw/foo bar baz/;
$q->enqueue(\@ary);

my $obj = &shared({});
$$obj{'foo'} = 'bar';
$$obj{'qux'} = 99;
bless($obj, 'My::Class');
$q->enqueue($obj);

有关通过队列传递对象的相关注意事项,请参见"LIMITATIONS"

队列创建

->new()

创建一个新的空队列。

->new(LIST)

创建一个新的队列,预先填充提供的项目列表。

基本方法

以下方法以 FIFO 方式处理队列。

->enqueue(LIST)

在队列末尾添加项目列表。

->dequeue()
->dequeue(COUNT)

从队列头部移除请求数量的项目(默认值为 1),并返回它们。如果队列中包含的项目少于请求数量,则线程将被阻塞,直到有足够的项目可用(即,直到其他线程enqueue更多项目)。

->dequeue_nb()
->dequeue_nb(COUNT)

从队列头部移除请求数量的项目(默认值为 1),并返回它们。如果队列中包含的项目少于请求数量,则它会立即(即非阻塞)返回队列中的所有项目。如果队列为空,则返回undef

->dequeue_timed(TIMEOUT)
->dequeue_timed(TIMEOUT, COUNT)

从队列头部移除请求数量的项目(默认值为 1),并返回它们。如果队列中包含的项目少于请求数量,则线程将被阻塞,直到有足够的项目可用,或者直到超时。如果超时,则返回队列中存在的任何项目,或者如果队列为空,则返回 undef

超时可以是相对于当前时间的秒数(例如,从调用开始的 5 秒),也可以是与 cond_timedwait() 相同的以纪元秒为单位的绝对超时。也支持小数秒(例如,2.5 秒)(在底层实现的范围内)。

如果 TIMEOUT 缺失、为 undef 或小于或等于 0,则此调用与 dequeue_nb 的行为相同。

->pending()

返回队列中剩余的项目数量。如果队列已结束(见下文),并且队列中没有更多项目,则返回 undef

->limit

设置队列的大小。如果设置,对 enqueue() 的调用将被阻塞,直到队列中待处理项目的数量降至 limit 以下。limit 不会阻止在该计数之外的项目入队。

my $q = Thread::Queue->new(1, 2);
$q->limit = 4;
$q->enqueue(3, 4, 5);   # Does not block
$q->enqueue(6);         # Blocks until at least 2 items are
                        # dequeued
my $size = $q->limit;   # Returns the current limit (may return
                        # 'undef')
$q->limit = 0;          # Queue size is now unlimited

使用大于队列 limitCOUNT 调用任何 dequeue 方法将生成错误。

->end()

声明不再向队列添加项目。

所有在 dequeue() 调用上阻塞的线程都将被解除阻塞,并返回队列中剩余的任何项目和/或 undef。对 dequeue() 的任何后续调用将与 dequeue_nb() 的行为相同。

结束之后,不能再将项目放入队列。

高级方法

以下方法可用于操作队列中的任何项目。

为了防止在检查和/或更改队列内容时被其他线程修改,请在本地块内 锁定 队列。

{
    lock($q);   # Keep other threads from changing the queue's contents
    my $item = $q->peek();
    if ($item ...) {
        ...
    }
}
# Queue is now unlocked
->peek()
->peek(INDEX)

从队列中返回一个项目,但不进行出队操作。如果未指定索引,则默认为队列头部(索引位置 0)。负索引值与 数组 一样受支持(即,-1 是队列的末尾,-2 是倒数第二个,依此类推)。

如果在指定索引处不存在项目(即队列为空,或索引超出队列中项目的数量),则返回 undef

请记住,返回的项目不会从队列中删除,因此操作peeked 的引用会影响队列中的项目。

->insert(INDEX, LIST)

在指定索引位置(0 是列表的头部)将项目列表添加到队列中。该位置及以后的任何现有项目都会被推送到新添加的项目之后。

$q->enqueue(1, 2, 3, 4);
$q->insert(1, qw/foo bar/);
# Queue now contains:  1, foo, bar, 2, 3, 4

指定大于队列中项目数量的索引位置只会将列表添加到末尾。

支持负索引位置。

$q->enqueue(1, 2, 3, 4);
$q->insert(-2, qw/foo bar/);
# Queue now contains:  1, 2, foo, bar, 3, 4

指定大于队列中项目数量的负索引位置会将列表添加到队列的头部。

->extract()
->extract(INDEX)
->extract(INDEX, COUNT)

从队列中指定索引位置(0 是队列的头部)删除并返回指定数量的项目(默认为 1)。当不带任何参数调用时,extract 的操作与 dequeue_nb 相同。

此方法是非阻塞的,并且只返回满足请求的可用项目数量。

$q->enqueue(1, 2, 3, 4);
my $item  = $q->extract(2)     # Returns 3
                               # Queue now contains:  1, 2, 4
my @items = $q->extract(1, 3)  # Returns (2, 4)
                               # Queue now contains:  1

指定大于队列中项目数量的索引位置会导致返回 undef 或空列表。

$q->enqueue('foo');
my $nada = $q->extract(3)      # Returns undef
my @nada = $q->extract(1, 3)   # Returns ()

支持负索引位置。指定大于队列中项目数量的负索引位置可能会从队列的头部返回项目(类似于 dequeue_nb),如果计数从指定位置重叠队列的头部(即如果队列大小 + 索引 + 计数大于零)。

$q->enqueue(qw/foo bar baz/);
my @nada = $q->extract(-6, 2);  # Returns ()      - (3+(-6)+2) <= 0
my @some = $q->extract(-6, 4);  # Returns (foo)   - (3+(-6)+4) > 0
                                # Queue now contains:  bar, baz
my @rest = $q->extract(-3, 4);  # Returns (bar, baz) -
                                #                   (2+(-3)+4) > 0

NOTES

Thread::Queue 创建的队列可以在线程化和非线程化应用程序中使用。

LIMITATIONS

如果对象的类不支持共享,则在队列上传递对象可能无法正常工作。有关更多信息,请参阅 "threads::shared 中的错误和限制"

对于 5.10.0 之前的 Perl 版本,传递包含对象的数组/哈希引用可能无法正常工作。

SEE ALSO

MetaCPAN 上的 Thread::Queue:https://metacpan.org/release/Thread-Queue

CPAN 发行版的代码库:https://github.com/Dual-Life/Thread-Queue

threadsthreads::shared

CPAN 上此发行版 examples 目录中的示例代码。

维护者

Jerry D. Hedden,<jdhedden AT cpan DOT org>

许可证

此程序是自由软件;您可以根据与 Perl 本身相同的条款重新分发和/或修改它。