Code Coverage |
||||||||||
Classes and Traits |
Functions and Methods |
Lines |
||||||||
| Total | |
0.00% |
0 / 1 |
|
0.00% |
0 / 8 |
CRAP | |
0.00% |
0 / 48 |
| DatabaseQueue | |
0.00% |
0 / 1 |
|
0.00% |
0 / 8 |
132 | |
0.00% |
0 / 48 |
| __construct | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 3 |
|||
| createItem | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 8 |
|||
| numberOfItems | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 2 |
|||
| claimItem | |
0.00% |
0 / 1 |
20 | |
0.00% |
0 / 19 |
|||
| releaseItem | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 7 |
|||
| deleteItem | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 4 |
|||
| createQueue | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 1 |
|||
| deleteQueue | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 4 |
|||
| <?php | |
| /** | |
| * @file | |
| * Contains \Drupal\Core\Queue\DatabaseQueue. | |
| */ | |
| namespace Drupal\Core\Queue; | |
| use Drupal\Core\Database\Connection; | |
| use Drupal\Core\DependencyInjection\DependencySerializationTrait; | |
| /** | |
| * Default queue implementation. | |
| * | |
| * @ingroup queue | |
| */ | |
| class DatabaseQueue implements ReliableQueueInterface { | |
| use DependencySerializationTrait; | |
| /** | |
| * The name of the queue this instance is working with. | |
| * | |
| * @var string | |
| */ | |
| protected $name; | |
| /** | |
| * The database connection. | |
| * | |
| * @var \Drupal\Core\Database\Connection $connection | |
| */ | |
| protected $connection; | |
| /** | |
| * Constructs a \Drupal\Core\Queue\DatabaseQueue object. | |
| * | |
| * @param string $name | |
| * The name of the queue. | |
| * @param \Drupal\Core\Database\Connection $connection | |
| * The Connection object containing the key-value tables. | |
| */ | |
| function __construct($name, Connection $connection) { | |
| $this->name = $name; | |
| $this->connection = $connection; | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function createItem($data) { | |
| $query = $this->connection->insert('queue') | |
| ->fields(array( | |
| 'name' => $this->name, | |
| 'data' => serialize($data), | |
| // We cannot rely on REQUEST_TIME because many items might be created | |
| // by a single request which takes longer than 1 second. | |
| 'created' => time(), | |
| )); | |
| // Return the new serial ID, or FALSE on failure. | |
| return $query->execute(); | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function numberOfItems() { | |
| return $this->connection->query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function claimItem($lease_time = 30) { | |
| // Claim an item by updating its expire fields. If claim is not successful | |
| // another thread may have claimed the item in the meantime. Therefore loop | |
| // until an item is successfully claimed or we are reasonably sure there | |
| // are no unclaimed items left. | |
| while (TRUE) { | |
| $item = $this->connection->queryRange('SELECT data, created, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); | |
| if ($item) { | |
| // Try to update the item. Only one thread can succeed in UPDATEing the | |
| // same row. We cannot rely on REQUEST_TIME because items might be | |
| // claimed by a single consumer which runs longer than 1 second. If we | |
| // continue to use REQUEST_TIME instead of the current time(), we steal | |
| // time from the lease, and will tend to reset items before the lease | |
| // should really expire. | |
| $update = $this->connection->update('queue') | |
| ->fields(array( | |
| 'expire' => time() + $lease_time, | |
| )) | |
| ->condition('item_id', $item->item_id) | |
| ->condition('expire', 0); | |
| // If there are affected rows, this update succeeded. | |
| if ($update->execute()) { | |
| $item->data = unserialize($item->data); | |
| return $item; | |
| } | |
| } | |
| else { | |
| // No items currently available to claim. | |
| return FALSE; | |
| } | |
| } | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function releaseItem($item) { | |
| $update = $this->connection->update('queue') | |
| ->fields(array( | |
| 'expire' => 0, | |
| )) | |
| ->condition('item_id', $item->item_id); | |
| return $update->execute(); | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function deleteItem($item) { | |
| $this->connection->delete('queue') | |
| ->condition('item_id', $item->item_id) | |
| ->execute(); | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function createQueue() { | |
| // All tasks are stored in a single database table (which is created when | |
| // Drupal is first installed) so there is nothing we need to do to create | |
| // a new queue. | |
| } | |
| /** | |
| * {@inheritdoc} | |
| */ | |
| public function deleteQueue() { | |
| $this->connection->delete('queue') | |
| ->condition('name', $this->name) | |
| ->execute(); | |
| } | |
| } |