JobQueueService

JobQueueService

The JobQueueService is used to create new JobQueue instances and access existing jobs.

Example

// A service which transcodes video files
class VideoTranscoderService {

  private jobQueue: JobQueue<{ videoId: string; }>;

  async onModuleInit() {
    // The JobQueue is created on initialization
    this.jobQueue = await this.jobQueueService.createQueue({
      name: 'transcode-video',
      process: async job => {
        return await this.transcodeVideo(job.data.videoId);
      },
    });
  }

  addToTranscodeQueue(videoId: string) {
    this.jobQueue.add({ videoId, })
  }

  private async transcodeVideo(videoId: string) {
    // e.g. call some external transcoding service
  }

}

Signature

class JobQueueService implements OnModuleDestroy {
  constructor(configService: ConfigService, jobBufferService: JobBufferService)
  async createQueue(options: CreateQueueOptions<Data>) => Promise<JobQueue<Data>>;
  async start() => Promise<void>;
  addBuffer(buffer: JobBuffer<any>) => ;
  removeBuffer(buffer: JobBuffer<any>) => ;
  bufferSize(forBuffers: Array<JobBuffer<any> | string>) => Promise<{ [bufferId: string]: number }>;
  flush(forBuffers: Array<JobBuffer<any> | string>) => Promise<Job[]>;
  getJobQueues() => GraphQlJobQueue[];
}

Implements

  • OnModuleDestroy

Members

constructor

method
type:
(configService: ConfigService, jobBufferService: JobBufferService) => JobQueueService

createQueue

async method
type:
(options: CreateQueueOptions<Data>) => Promise<JobQueue<Data>>
Configures and creates a new JobQueue instance.

start

async method
type:
() => Promise<void>

addBuffer

method
v1.3.0
type:
(buffer: JobBuffer<any>) =>
Adds a JobBuffer, which will make it active and begin collecting jobs to buffer.

removeBuffer

method
v1.3.0
type:
(buffer: JobBuffer<any>) =>
Removes a JobBuffer, prevent it from collecting and buffering any subsequent jobs.

bufferSize

method
v1.3.0
type:
(forBuffers: Array<JobBuffer<any> | string>) => Promise<{ [bufferId: string]: number }>

Returns an object containing the number of buffered jobs arranged by bufferId. This can be used to decide whether a particular buffer has any jobs to flush.

Passing in JobBuffer instances or ids limits the results to the specified JobBuffers. If no argument is passed, sizes will be returned for all JobBuffers.

Example

const sizes = await this.jobQueueService.bufferSize('buffer-1', 'buffer-2');

// sizes = { 'buffer-1': 12, 'buffer-2': 3 }

flush

method
v1.3.0
type:
(forBuffers: Array<JobBuffer<any> | string>) => Promise<Job[]>

Flushes the specified buffers, which means that the buffer is cleared and the jobs get sent to the job queue for processing. Before sending the jobs to the job queue, they will be passed through each JobBuffer’s reduce() method, which is can be used to optimize the amount of work to be done by e.g. de-duplicating identical jobs or aggregating data over the collected jobs.

Passing in JobBuffer instances or ids limits the action to the specified JobBuffers. If no argument is passed, all JobBuffers will be flushed.

Returns an array of all Jobs which were added to the job queue.

getJobQueues

method
type:
() => GraphQlJobQueue[]
Returns an array of { name: string; running: boolean; } for each registered JobQueue.