DatabaseQueue.php 8.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. <?php
  2. namespace Illuminate\Queue;
  3. use Illuminate\Support\Carbon;
  4. use Illuminate\Database\Connection;
  5. use Illuminate\Queue\Jobs\DatabaseJob;
  6. use Illuminate\Queue\Jobs\DatabaseJobRecord;
  7. use Illuminate\Contracts\Queue\Queue as QueueContract;
  8. class DatabaseQueue extends Queue implements QueueContract
  9. {
  10. /**
  11. * The database connection instance.
  12. *
  13. * @var \Illuminate\Database\Connection
  14. */
  15. protected $database;
  16. /**
  17. * The database table that holds the jobs.
  18. *
  19. * @var string
  20. */
  21. protected $table;
  22. /**
  23. * The name of the default queue.
  24. *
  25. * @var string
  26. */
  27. protected $default;
  28. /**
  29. * The expiration time of a job.
  30. *
  31. * @var int|null
  32. */
  33. protected $retryAfter = 60;
  34. /**
  35. * Create a new database queue instance.
  36. *
  37. * @param \Illuminate\Database\Connection $database
  38. * @param string $table
  39. * @param string $default
  40. * @param int $retryAfter
  41. * @return void
  42. */
  43. public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60)
  44. {
  45. $this->table = $table;
  46. $this->default = $default;
  47. $this->database = $database;
  48. $this->retryAfter = $retryAfter;
  49. }
  50. /**
  51. * Get the size of the queue.
  52. *
  53. * @param string $queue
  54. * @return int
  55. */
  56. public function size($queue = null)
  57. {
  58. return $this->database->table($this->table)
  59. ->where('queue', $this->getQueue($queue))
  60. ->count();
  61. }
  62. /**
  63. * Push a new job onto the queue.
  64. *
  65. * @param string $job
  66. * @param mixed $data
  67. * @param string $queue
  68. * @return mixed
  69. */
  70. public function push($job, $data = '', $queue = null)
  71. {
  72. return $this->pushToDatabase($queue, $this->createPayload($job, $data));
  73. }
  74. /**
  75. * Push a raw payload onto the queue.
  76. *
  77. * @param string $payload
  78. * @param string $queue
  79. * @param array $options
  80. * @return mixed
  81. */
  82. public function pushRaw($payload, $queue = null, array $options = [])
  83. {
  84. return $this->pushToDatabase($queue, $payload);
  85. }
  86. /**
  87. * Push a new job onto the queue after a delay.
  88. *
  89. * @param \DateTimeInterface|\DateInterval|int $delay
  90. * @param string $job
  91. * @param mixed $data
  92. * @param string $queue
  93. * @return void
  94. */
  95. public function later($delay, $job, $data = '', $queue = null)
  96. {
  97. return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay);
  98. }
  99. /**
  100. * Push an array of jobs onto the queue.
  101. *
  102. * @param array $jobs
  103. * @param mixed $data
  104. * @param string $queue
  105. * @return mixed
  106. */
  107. public function bulk($jobs, $data = '', $queue = null)
  108. {
  109. $queue = $this->getQueue($queue);
  110. $availableAt = $this->availableAt();
  111. return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
  112. function ($job) use ($queue, $data, $availableAt) {
  113. return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
  114. }
  115. )->all());
  116. }
  117. /**
  118. * Release a reserved job back onto the queue.
  119. *
  120. * @param string $queue
  121. * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
  122. * @param int $delay
  123. * @return mixed
  124. */
  125. public function release($queue, $job, $delay)
  126. {
  127. return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);
  128. }
  129. /**
  130. * Push a raw payload to the database with a given delay.
  131. *
  132. * @param string|null $queue
  133. * @param string $payload
  134. * @param \DateTimeInterface|\DateInterval|int $delay
  135. * @param int $attempts
  136. * @return mixed
  137. */
  138. protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
  139. {
  140. return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
  141. $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
  142. ));
  143. }
  144. /**
  145. * Create an array to insert for the given job.
  146. *
  147. * @param string|null $queue
  148. * @param string $payload
  149. * @param int $availableAt
  150. * @param int $attempts
  151. * @return array
  152. */
  153. protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
  154. {
  155. return [
  156. 'queue' => $queue,
  157. 'attempts' => $attempts,
  158. 'reserved_at' => null,
  159. 'available_at' => $availableAt,
  160. 'created_at' => $this->currentTime(),
  161. 'payload' => $payload,
  162. ];
  163. }
  164. /**
  165. * Pop the next job off of the queue.
  166. *
  167. * @param string $queue
  168. * @return \Illuminate\Contracts\Queue\Job|null
  169. * @throws \Exception|\Throwable
  170. */
  171. public function pop($queue = null)
  172. {
  173. $queue = $this->getQueue($queue);
  174. return $this->database->transaction(function () use ($queue) {
  175. if ($job = $this->getNextAvailableJob($queue)) {
  176. return $this->marshalJob($queue, $job);
  177. }
  178. return null;
  179. });
  180. }
  181. /**
  182. * Get the next available job for the queue.
  183. *
  184. * @param string|null $queue
  185. * @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
  186. */
  187. protected function getNextAvailableJob($queue)
  188. {
  189. $job = $this->database->table($this->table)
  190. ->lockForUpdate()
  191. ->where('queue', $this->getQueue($queue))
  192. ->where(function ($query) {
  193. $this->isAvailable($query);
  194. $this->isReservedButExpired($query);
  195. })
  196. ->orderBy('id', 'asc')
  197. ->first();
  198. return $job ? new DatabaseJobRecord((object) $job) : null;
  199. }
  200. /**
  201. * Modify the query to check for available jobs.
  202. *
  203. * @param \Illuminate\Database\Query\Builder $query
  204. * @return void
  205. */
  206. protected function isAvailable($query)
  207. {
  208. $query->where(function ($query) {
  209. $query->whereNull('reserved_at')
  210. ->where('available_at', '<=', $this->currentTime());
  211. });
  212. }
  213. /**
  214. * Modify the query to check for jobs that are reserved but have expired.
  215. *
  216. * @param \Illuminate\Database\Query\Builder $query
  217. * @return void
  218. */
  219. protected function isReservedButExpired($query)
  220. {
  221. $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
  222. $query->orWhere(function ($query) use ($expiration) {
  223. $query->where('reserved_at', '<=', $expiration);
  224. });
  225. }
  226. /**
  227. * Marshal the reserved job into a DatabaseJob instance.
  228. *
  229. * @param string $queue
  230. * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
  231. * @return \Illuminate\Queue\Jobs\DatabaseJob
  232. */
  233. protected function marshalJob($queue, $job)
  234. {
  235. $job = $this->markJobAsReserved($job);
  236. return new DatabaseJob(
  237. $this->container, $this, $job, $this->connectionName, $queue
  238. );
  239. }
  240. /**
  241. * Mark the given job ID as reserved.
  242. *
  243. * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
  244. * @return \Illuminate\Queue\Jobs\DatabaseJobRecord
  245. */
  246. protected function markJobAsReserved($job)
  247. {
  248. $this->database->table($this->table)->where('id', $job->id)->update([
  249. 'reserved_at' => $job->touch(),
  250. 'attempts' => $job->increment(),
  251. ]);
  252. return $job;
  253. }
  254. /**
  255. * Delete a reserved job from the queue.
  256. *
  257. * @param string $queue
  258. * @param string $id
  259. * @return void
  260. * @throws \Exception|\Throwable
  261. */
  262. public function deleteReserved($queue, $id)
  263. {
  264. $this->database->transaction(function () use ($id) {
  265. if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
  266. $this->database->table($this->table)->where('id', $id)->delete();
  267. }
  268. });
  269. }
  270. /**
  271. * Get the queue or return the default.
  272. *
  273. * @param string|null $queue
  274. * @return string
  275. */
  276. public function getQueue($queue)
  277. {
  278. return $queue ?: $this->default;
  279. }
  280. /**
  281. * Get the underlying database instance.
  282. *
  283. * @return \Illuminate\Database\Connection
  284. */
  285. public function getDatabase()
  286. {
  287. return $this->database;
  288. }
  289. }