Executing Data Tasks#
DUFT’s Data Task Engine (DTE) provides a flexible framework for automating Extract, Transform, Load (ETL) processes and other data-related tasks. These tasks are defined within DUFT Config and executed by DUFT Server, supporting both Python scripts and Jupyter Notebooks.
While Data Tasks primarily handle ETL processes, they are also used for a range of other operations, including:
Execute ETL Perform ETL and store results in an optimised Analysis database, including restructuring name-value schemas into standard tabular formats.
Fetch data from an upstream source, such as a central viral load repository. Retrieved data can be integrated into a source system (e.g., an EMR) or, if direct integration is impractical, stored in the Analysis database alongside existing data.
Submit data to an upstream system, such as a monthly report to DHIS2 or a DUFT Server. The task aggregates and formats data before transmission.
Update DUFT Config by pulling it directly from GitHub. and enabling users to update the dashboard definitions and data tasks dynamically without having to re-install DUFT
Any other Python-based command, through a custom script
Key Engineering Decisions: Data Tasks
Several key engineering decisions shaped the design and implementation of Data Tasks:
Data-first approach – DUFT prioritises data transformation choosing Python and Jupyter as the industry standard for ETL and data science tasks.
Decoupled development – Data engineers can develop scripts independently, without deep knowledge of DUFT’s internals.
Real-time feedback loops – Long-running tasks require user visibility. DUFT uses Server-Sent Events (SSE) to stream logs, progress updates, and errors to the UI, improving transparency.
Configurable execution with parameters – Data Tasks can take preset parameters, user-selected parameters from dashboards, or manual inputs (planned feature), ensuring flexibility.
Asynchronous execution – Tasks run in the background to prevent blocking system operations.
Defining Data Tasks#
Data Tasks are defined in data_tasks.json within DUFT Config. Each entry includes:
ID and Name – A unique identifier and a descriptive name.
Python File – The script or notebook to execute.
Data Connections – Defines which databases or APIs the task interacts with.
Parameters – Default values and user-overridable settings.
Here is an example of a data_tasks.json file:
// Data Task Sample
// data_tasks.json
{
"id": "ROOT-SAMPLE",
"name": "Sample Data Task executed from the Root",
"description": "A sample data task that does nothing but needs to be executed from the root",
"pythonFile": "sample_task_from_root.py",
"hideInUI": true,
"executeFromRoot": true,
"dataConnections": [
"ANA",
"MyOwnConnection"
],
"supportedParameters": {
"repo": {
"title": "Repository",
"description": "The git repository.",
"defaultValue": "repo-git",
"allowOverride": true,
"promptUser": false
},
"name": {
"title": "Name",
"description": "The name of the data task.",
"defaultValue": "ROOT DATA TASK",
"allowOverride": true,
"promptUser": false
},
"destinationDirectory": {
"title": "Destination Directory",
"description": "The directory where the data will be saved.",
"defaultValue": "duft-config-test",
"allowOverride": true,
"promptUser": false
},
"sleep_time": {
"title": "Sleep Time",
"description": "Time to wait between execution steps in seconds.",
"defaultValue": 0.5,
"allowOverride": true,
"promptUser": false
}
}
},
Parameters#
Data Tasks can be provided with parameters. DUFT supports three types of parameters:
Fixed parameters – Hardcoded in data_tasks.json, ideal for global configurations.
Dashboard-linked parameters – Selected dynamically via dashboard filters.
User-input parameters (Planned) – A UI dialog box will allow users to manually input values before execution.
Currently, the API supports all parameter types, but UI integration is still evolving.
Connecting to Data Sources#
Data Tasks typically interact with databases or APIs, requiring connection details such as server credentials, API endpoints, or authentication keys. DUFT handles this via Data Connections, stored in data_connections.json. Here is an example:
// Data Connection Sample
// data_connections.json
{
"id": "ANA",
"name": "Analysis Database",
"description": "Stores all analytical data.",
"params": [
{
"name": "type",
"type": "text",
"label": "DB Type"
},
{
"name": "server",
"type": "text",
"label": "Server Name"
},
{
"name": "database",
"type": "text",
"label": "Database Name"
},
…
]
}
DUFT UI will present users with the option to configure these parameters, as is shown here:
Connection settings are stored in user/config.json, keeping user-configurable values out of version-controlled files, as shown here:
// Data Connection Parameters set by user
// duft-config/user/config.json
{
"dataConnectionParameters": {
"ANA": {
"server": "127.0.0.1",
"username": "user",
"password": "mysecretword",
"port": "5432",
"database": "analysis",
"sqlite3file": "wakanda.sqlite",
"type": "sqlite3"
},
…
Beyond databases, Data Connections can store API parameters for national repositories, cloud services, or custom integrations. When a Data Task executes, it retrieves the necessary Data Connection parameters dynamically, and passes them on to the Data Task.
Scheduling and Monitoring#
Tasks can be started manually, from the UI or scheduled for execution using cron-based scheduling in config.json, allowing automated execution at predefined intervals.
"scheduledDataTasks": {
"SAMPLE": {
"description": "Scheduled sample task",
"schedule": {
"cron": {
"second": "*/30",
"minute": "*",
"hour": "*",
"day_of_month": "*",
"month": "*",
"day_of_week": "*"
}
}
}
}
Updating the UI when a Data Task runs#
When a Data Task executes, DUFT streams live execution logs to the UI via Server-Sent Events (SSE). As the task progresses, real-time status updates—including progress, errors, and completion—are pushed to the UI, allowing users to monitor execution without refreshing the page. This ensures a transparent execution process, particularly for long-running ETL operations.
To prevent inconsistencies in visualisations, DUFT UI temporarily disables interactions with affected dashboards while a Data Task is running, since certain tasks may modify datasets or leave tables in an intermediate state, rendering dashboards during execution could result in incomplete or incorrect data. By temporarily disabling UI interactions, DUFT ensures that users only view stable, post-task results.
Data Task Engineers can provide specific logging into their Data Task scripts that are broadcast to the UI, enabling users to follow progress easily:
environment.log_message("Task started...")
time.sleep(1)
environment.log_message("Processing data...")
time.sleep(1)
environment.log_message("Task completed successfully!")
Reporting an Error#
environment.log_error("An unexpected error occurred!")
Executing Data Tasks#
Data Tasks are executed by the Data Task Engine (DTE). It is responsible for asynchronously executing the Data Tasks (i.e. Python scripts and Jupyter notebooks )stored in DUFT Config, providing an easy way to run background data tasks.
Execution Pipeline#
The DTE follows the following execution pipeline to process incoming task execution requests:
Task Retrieval – The requested data task is retrieved from DUFT Config.
Script Preparation – The task script is located and validated. If it needs to be copied to a root execution directory, this step ensures that happens.
Parameter Resolution – Merges user-specified inputs with defaults and retrieves required data connection parameters.
Task Execution – The script is executed in a separate process, ensuring it does not block other operations.
Real-Time Logging – Execution logs and progress updates are broadcast to the UI via server-sent events.
Completion Handling – Once execution is finished, a success or failure message is returned.
Sample Data Task#
DUFT Config comes with the following sample data task:
import time
import sys
import os
from api_data_task_executioner.data_task_tools import assert_dte_tools_available, get_resolved_parameters_for_connection, initialise_data_task, find_json_arg, DataTaskEnvironment
params = {}
environment: DataTaskEnvironment = None
if __name__ == "__main__":
json_args = find_json_arg(sys.argv)
environment = initialise_data_task("Sample Data Task", params=json_args)
params["name"] = json_args.get("name", "No parameters given!")
params["sleep_time"] = json_args.get("sleep_time", 0.2)
params["simulate_error"] = json_args.get("simulate_error", False)
if not json_args:
environment.log_error("No parameters given!")
def sample_task():
resolved_parameters = get_resolved_parameters_for_connection("ANA")
environment.log_message('Script starting!')
environment.log_message("Using Data Connection Parameters: %s" % resolved_parameters)
for i in range(10):
# Simulate a long-running task
time.sleep(params["sleep_time"])
# Send intermediate update to the client
environment.log_message(f'Progress for %s: {i+1}/10' % params["name"])
if (params["simulate_error"]) and i == 5:
environment.log_error('A simulated error occurred!')
environment.log_message('Could not complete SAMPLE as it raised an error. You may want to try again.')
try:
sys.stdout.flush()
sys.exit(1) # Exiting with error code
except SystemExit as e:
print(f"SystemExit with code: {e.code}")
os._exit(1)
environment.log_message('Script completed! %s' % environment.current_data_task_name)
assert_dte_tools_available()
sample_task()