15. Tasks - doing heavy work in the background

Hint

The code snippets on this page need the following imports if you’re outside the pyqgis console:

 1from qgis.core import (
 2  QgsProcessingContext,
 3  QgsTaskManager,
 4  QgsTask,
 5  QgsProcessingAlgRunnerTask,
 6  Qgis,
 7  QgsProcessingFeedback,
 8  QgsApplication,
 9  QgsMessageLog,
10)

15.1. Introduction

Background processing using threads is a way to maintain a responsive user interface when heavy processing is going on. Tasks can be used to achieve threading in QGIS.

A task (QgsTask) is a container for the code to be performed in the background, and the task manager (QgsTaskManager) is used to control the running of the tasks. These classes simplify background processing in QGIS by providing mechanisms for signaling, progress reporting and access to the status for background processes. Tasks can be grouped using subtasks.

The global task manager (found with QgsApplication.taskManager()) is normally used. This means that your tasks may not be the only tasks that are controlled by the task manager.

There are several ways to create a QGIS task:

  • Create your own task by extending QgsTask

    class SpecialisedTask(QgsTask):
        pass
    
  • Create a task from a function

     1def heavyFunction():
     2    # Some CPU intensive processing ...
     3    pass
     4
     5def workdone():
     6    # ... do something useful with the results
     7    pass
     8
     9task = QgsTask.fromFunction('heavy function', heavyFunction,
    10                     on_finished=workdone)
    
  • Create a task from a processing algorithm

    1params = dict()
    2context = QgsProcessingContext()
    3feedback = QgsProcessingFeedback()
    4
    5buffer_alg = QgsApplication.instance().processingRegistry().algorithmById('native:buffer')
    6task = QgsProcessingAlgRunnerTask(buffer_alg, params, context,
    7                           feedback)
    

Warning

Any background task (regardless of how it is created) must NEVER use any QObject that lives on the main thread, such as accessing QgsVectorLayer, QgsProject or perform any GUI based operations like creating new widgets or interacting with existing widgets. Qt widgets must only be accessed or modified from the main thread. Data that is used in a task must be copied before the task is started. Attempting to use them from background threads will result in crashes.

Dependencies between tasks can be described using the addSubTask() function of QgsTask. When a dependency is stated, the task manager will automatically determine how these dependencies will be executed. Wherever possible dependencies will be executed in parallel in order to satisfy them as quickly as possible. If a task on which another task depends is canceled, the dependent task will also be canceled. Circular dependencies can make deadlocks possible, so be careful.

If a task depends on a layer being available, this can be stated using the setDependentLayers() function of QgsTask. If a layer on which a task depends is not available, the task will be canceled.

Once the task has been created it can be scheduled for running using the addTask() function of the task manager. Adding a task to the manager automatically transfers ownership of that task to the manager, and the manager will cleanup and delete tasks after they have executed. The scheduling of the tasks is influenced by the task priority, which is set in addTask().

The status of tasks can be monitored using QgsTask and QgsTaskManager signals and functions.

15.2. Examples

15.2.1. Extending QgsTask

In this example RandomIntegerSumTask extends QgsTask and will generate 100 random integers between 0 and 500 during a specified period of time. If the random number is 42, the task is aborted and an exception is raised. Several instances of RandomIntegerSumTask (with subtasks) are generated and added to the task manager, demonstrating two types of dependencies.

  1import random
  2from time import sleep
  3
  4from qgis.core import (
  5    QgsApplication, QgsTask, QgsMessageLog, Qgis
  6    )
  7
  8MESSAGE_CATEGORY = 'RandomIntegerSumTask'
  9
 10class RandomIntegerSumTask(QgsTask):
 11    """This shows how to subclass QgsTask"""
 12
 13    def __init__(self, description, duration):
 14        super().__init__(description, QgsTask.CanCancel)
 15        self.duration = duration
 16        self.total = 0
 17        self.iterations = 0
 18        self.exception = None
 19
 20    def run(self):
 21        """Here you implement your heavy lifting.
 22        Should periodically test for isCanceled() to gracefully
 23        abort.
 24        This method MUST return True or False.
 25        Raising exceptions will crash QGIS, so we handle them
 26        internally and raise them in self.finished
 27        """
 28        QgsMessageLog.logMessage('Started task "{}"'.format(
 29                                     self.description()),
 30                                 MESSAGE_CATEGORY, Qgis.Info)
 31        wait_time = self.duration / 100
 32        for i in range(100):
 33            sleep(wait_time)
 34            # use setProgress to report progress
 35            self.setProgress(i)
 36            arandominteger = random.randint(0, 500)
 37            self.total += arandominteger
 38            self.iterations += 1
 39            # check isCanceled() to handle cancellation
 40            if self.isCanceled():
 41                return False
 42            # simulate exceptions to show how to abort task
 43            if arandominteger == 42:
 44                # DO NOT raise Exception('bad value!')
 45                # this would crash QGIS
 46                self.exception = Exception('bad value!')
 47                return False
 48        return True
 49
 50    def finished(self, result):
 51        """
 52        This function is automatically called when the task has
 53        completed (successfully or not).
 54        You implement finished() to do whatever follow-up stuff
 55        should happen after the task is complete.
 56        finished is always called from the main thread, so it's safe
 57        to do GUI operations and raise Python exceptions here.
 58        result is the return value from self.run.
 59        """
 60        if result:
 61            QgsMessageLog.logMessage(
 62                'RandomTask "{name}" completed\n' \
 63                'RandomTotal: {total} (with {iterations} '\
 64              'iterations)'.format(
 65                  name=self.description(),
 66                  total=self.total,
 67                  iterations=self.iterations),
 68              MESSAGE_CATEGORY, Qgis.Success)
 69        else:
 70            if self.exception is None:
 71                QgsMessageLog.logMessage(
 72                    'RandomTask "{name}" not successful but without '\
 73                    'exception (probably the task was manually '\
 74                    'canceled by the user)'.format(
 75                        name=self.description()),
 76                    MESSAGE_CATEGORY, Qgis.Warning)
 77            else:
 78                QgsMessageLog.logMessage(
 79                    'RandomTask "{name}" Exception: {exception}'.format(
 80                        name=self.description(),
 81                        exception=self.exception),
 82                    MESSAGE_CATEGORY, Qgis.Critical)
 83                raise self.exception
 84
 85    def cancel(self):
 86        QgsMessageLog.logMessage(
 87            'RandomTask "{name}" was canceled'.format(
 88                name=self.description()),
 89            MESSAGE_CATEGORY, Qgis.Info)
 90        super().cancel()
 91
 92
 93longtask = RandomIntegerSumTask('waste cpu long', 20)
 94shorttask = RandomIntegerSumTask('waste cpu short', 10)
 95minitask = RandomIntegerSumTask('waste cpu mini', 5)
 96shortsubtask = RandomIntegerSumTask('waste cpu subtask short', 5)
 97longsubtask = RandomIntegerSumTask('waste cpu subtask long', 10)
 98shortestsubtask = RandomIntegerSumTask('waste cpu subtask shortest', 4)
 99
100# Add a subtask (shortsubtask) to shorttask that must run after
101# minitask and longtask has finished
102shorttask.addSubTask(shortsubtask, [minitask, longtask])
103# Add a subtask (longsubtask) to longtask that must be run
104# before the parent task
105longtask.addSubTask(longsubtask, [], QgsTask.ParentDependsOnSubTask)
106# Add a subtask (shortestsubtask) to longtask
107longtask.addSubTask(shortestsubtask)
108
109QgsApplication.taskManager().addTask(longtask)
110QgsApplication.taskManager().addTask(shorttask)
111QgsApplication.taskManager().addTask(minitask)
 1RandomIntegerSumTask(0): Started task "waste cpu subtask shortest"
 2RandomIntegerSumTask(0): Started task "waste cpu short"
 3RandomIntegerSumTask(0): Started task "waste cpu mini"
 4RandomIntegerSumTask(0): Started task "waste cpu subtask long"
 5RandomIntegerSumTask(3): Task "waste cpu subtask shortest" completed
 6RandomTotal: 25452 (with 100 iterations)
 7RandomIntegerSumTask(3): Task "waste cpu mini" completed
 8RandomTotal: 23810 (with 100 iterations)
 9RandomIntegerSumTask(3): Task "waste cpu subtask long" completed
10RandomTotal: 26308 (with 100 iterations)
11RandomIntegerSumTask(0): Started task "waste cpu long"
12RandomIntegerSumTask(3): Task "waste cpu long" completed
13RandomTotal: 22534 (with 100 iterations)

15.2.2. Task from function

Create a task from a function (doSomething in this example). The first parameter of the function will hold the QgsTask for the function. An important (named) parameter is on_finished, that specifies a function that will be called when the task has completed. The doSomething function in this example has an additional named parameter wait_time.

 1import random
 2from time import sleep
 3
 4MESSAGE_CATEGORY = 'TaskFromFunction'
 5
 6def doSomething(task, wait_time):
 7    """
 8    Raises an exception to abort the task.
 9    Returns a result if success.
10    The result will be passed, together with the exception (None in
11    the case of success), to the on_finished method.
12    If there is an exception, there will be no result.
13    """
14    QgsMessageLog.logMessage('Started task {}'.format(task.description()),
15                             MESSAGE_CATEGORY, Qgis.Info)
16    wait_time = wait_time / 100
17    total = 0
18    iterations = 0
19    for i in range(100):
20        sleep(wait_time)
21        # use task.setProgress to report progress
22        task.setProgress(i)
23        arandominteger = random.randint(0, 500)
24        total += arandominteger
25        iterations += 1
26        # check task.isCanceled() to handle cancellation
27        if task.isCanceled():
28            stopped(task)
29            return None
30        # raise an exception to abort the task
31        if arandominteger == 42:
32            raise Exception('bad value!')
33    return {'total': total, 'iterations': iterations,
34            'task': task.description()}
35
36def stopped(task):
37    QgsMessageLog.logMessage(
38        'Task "{name}" was canceled'.format(
39            name=task.description()),
40        MESSAGE_CATEGORY, Qgis.Info)
41
42def completed(exception, result=None):
43    """This is called when doSomething is finished.
44    Exception is not None if doSomething raises an exception.
45    result is the return value of doSomething."""
46    if exception is None:
47        if result is None:
48            QgsMessageLog.logMessage(
49                'Completed with no exception and no result '\
50                '(probably manually canceled by the user)',
51                MESSAGE_CATEGORY, Qgis.Warning)
52        else:
53            QgsMessageLog.logMessage(
54                'Task {name} completed\n'
55                'Total: {total} ( with {iterations} '
56                'iterations)'.format(
57                    name=result['task'],
58                    total=result['total'],
59                    iterations=result['iterations']),
60                MESSAGE_CATEGORY, Qgis.Info)
61    else:
62        QgsMessageLog.logMessage("Exception: {}".format(exception),
63                                 MESSAGE_CATEGORY, Qgis.Critical)
64        raise exception
65
66# Create a few tasks
67task1 = QgsTask.fromFunction('Waste cpu 1', doSomething,
68                             on_finished=completed, wait_time=4)
69task2 = QgsTask.fromFunction('Waste cpu 2', doSomething,
70                             on_finished=completed, wait_time=3)
71QgsApplication.taskManager().addTask(task1)
72QgsApplication.taskManager().addTask(task2)
1RandomIntegerSumTask(0): Started task "waste cpu subtask short"
2RandomTaskFromFunction(0): Started task Waste cpu 1
3RandomTaskFromFunction(0): Started task Waste cpu 2
4RandomTaskFromFunction(0): Task Waste cpu 2 completed
5RandomTotal: 23263 ( with 100 iterations)
6RandomTaskFromFunction(0): Task Waste cpu 1 completed
7RandomTotal: 25044 ( with 100 iterations)

15.2.3. Task from a processing algorithm

Create a task that uses the algorithm qgis:randompointsinextent to generate 50000 random points inside a specified extent. The result is added to the project in a safe way.

 1from functools import partial
 2from qgis.core import (QgsTaskManager, QgsMessageLog,
 3                       QgsProcessingAlgRunnerTask, QgsApplication,
 4                       QgsProcessingContext, QgsProcessingFeedback,
 5                       QgsProject)
 6
 7MESSAGE_CATEGORY = 'AlgRunnerTask'
 8
 9def task_finished(context, successful, results):
10    if not successful:
11        QgsMessageLog.logMessage('Task finished unsucessfully',
12                                 MESSAGE_CATEGORY, Qgis.Warning)
13    output_layer = context.getMapLayer(results['OUTPUT'])
14    # because getMapLayer doesn't transfer ownership, the layer will
15    # be deleted when context goes out of scope and you'll get a
16    # crash.
17    # takeMapLayer transfers ownership so it's then safe to add it
18    # to the project and give the project ownership.
19    if output_layer and output_layer.isValid():
20        QgsProject.instance().addMapLayer(
21             context.takeResultLayer(output_layer.id()))
22
23alg = QgsApplication.processingRegistry().algorithmById(
24                                      'qgis:randompointsinextent')
25# `context` and `feedback` need to
26# live for as least as long as `task`,
27# otherwise the program will crash.
28# Initializing them globally is a sure way
29# of avoiding this unfortunate situation.
30context = QgsProcessingContext()
31feedback = QgsProcessingFeedback()
32params = {
33    'EXTENT': '0.0,10.0,40,50 [EPSG:4326]',
34    'MIN_DISTANCE': 0.0,
35    'POINTS_NUMBER': 50000,
36    'TARGET_CRS': 'EPSG:4326',
37    'OUTPUT': 'memory:My random points'
38}
39task = QgsProcessingAlgRunnerTask(alg, params, context, feedback)
40task.executed.connect(partial(task_finished, context))
41QgsApplication.taskManager().addTask(task)

See also: https://www.opengis.ch/2018/06/22/threads-in-pyqgis3/.